A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor database interaction for scrobbles, tracks, albums, and artists in DuckDB and Postgres repositories

- Implemented `save_scrobble` function in DuckDB to handle scrobble records, including user, album, artist, and track relations.
- Refactored `save_track` function in DuckDB to insert or update track records.
- Removed unnecessary `ScrobbleRepo`, `TrackRepo`, and `UserRepo` structs in DuckDB and replaced them with direct function implementations.
- Updated `Repo` trait to include async methods for inserting albums, artists, scrobbles, tracks, and users.
- Created `sync_scrobbles` function for synchronizing scrobbles from Postgres to DuckDB.
- Added new types for handling records: `ScrobbleRecord`, `ArtistRecord`, `AlbumRecord`, and `SongRecord`.
- Enhanced error handling and logging throughout the database operations.
- Updated command line interface to include a sync command for scrobbles.

+1288 -110
+2
Cargo.lock
··· 5372 5372 "reqwest", 5373 5373 "serde", 5374 5374 "serde_json", 5375 + "sha256", 5375 5376 "sqlx", 5376 5377 "tokio", 5377 5378 "tokio-stream", ··· 5379 5380 "tracing", 5380 5381 "tungstenite 0.26.2", 5381 5382 "warp", 5383 + "xid", 5382 5384 ] 5383 5385 5384 5386 [[package]]
+2
crates/feed/Cargo.toml
··· 39 39 ] } 40 40 tokio-stream = { version = "0.1.17", features = ["full"] } 41 41 async-trait = "0.1.89" 42 + xid = "1.1.1" 43 + sha256 = "1.6.0"
+40
crates/feed/src/did.rs
··· 1 + use anyhow::Error; 2 + 3 + use crate::types::{Profile, ProfileResponse}; 4 + 5 + pub async fn did_to_profile(did: &str) -> Result<Profile, Error> { 6 + let client = reqwest::Client::new(); 7 + let response = client 8 + .get(format!("https://plc.directory/{}", did)) 9 + .header("Accept", "application/json") 10 + .send() 11 + .await? 12 + .json::<serde_json::Value>() 13 + .await?; 14 + 15 + let handle = response["alsoKnownAs"][0] 16 + .as_str() 17 + .unwrap_or("") 18 + .split("at://") 19 + .last() 20 + .unwrap_or(""); 21 + 22 + let service_endpoint = response["service"][0]["serviceEndpoint"] 23 + .as_str() 24 + .unwrap_or(""); 25 + 26 + if service_endpoint.is_empty() { 27 + return Err(Error::msg("Invalid did")); 28 + } 29 + 30 + let client = reqwest::Client::new(); 31 + let mut response = client.get(format!("{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=app.bsky.actor.profile&rkey=self", service_endpoint, did)) 32 + .header("Accept", "application/json") 33 + .send() 34 + .await? 35 + .json::<ProfileResponse>() 36 + .await?; 37 + 38 + response.value.handle = Some(handle.to_string()); 39 + Ok(response.value) 40 + }
+4 -1
crates/feed/src/feed.rs
··· 125 125 ); 126 126 let subscriber = ScrobbleSubscriber::new(&url); 127 127 128 - subscriber.run().await?; 128 + match subscriber.run().await { 129 + Ok(_) => tracing::info!("Firehose listener exited normally"), 130 + Err(e) => tracing::error!(error = %e, "Firehose listener exited with error"), 131 + } 129 132 130 133 Ok::<(), Error>(()) 131 134 });
+1
crates/feed/src/feeds/discover/mod.rs
··· 1 +
+1
crates/feed/src/feeds/new/mod.rs
··· 1 +
+1
crates/feed/src/feeds/scrobbles/mod.rs
··· 1 +
+1
crates/feed/src/feeds/trending/mod.rs
··· 1 +
+12 -1
crates/feed/src/lib.rs
··· 7 7 use crate::{ 8 8 feed::Feed, 9 9 feed_handler::FeedHandler, 10 + repo::duckdb::DB_PATH, 10 11 types::{FeedResult, Scrobble}, 11 12 }; 12 13 13 14 pub mod config; 15 + pub mod did; 14 16 pub mod feed; 15 17 pub mod feed_handler; 16 18 pub mod feeds; 17 19 pub mod repo; 18 20 pub mod subscriber; 21 + pub mod sync; 19 22 pub mod types; 20 23 pub mod xata; 24 + 25 + pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble"; 26 + pub const ARTIST_NSID: &str = "app.rocksky.artist"; 27 + pub const ALBUM_NSID: &str = "app.rocksky.album"; 28 + pub const SONG_NSID: &str = "app.rocksky.song"; 29 + pub const PLAYLIST_NSID: &str = "app.rocksky.playlist"; 30 + pub const LIKE_NSID: &str = "app.rocksky.like"; 31 + pub const SHOUT_NSID: &str = "app.rocksky.shout"; 21 32 22 33 pub struct RecentlyPlayedFeed { 23 34 handler: RecentlyPlayedFeedHandler, ··· 53 64 } 54 65 55 66 pub async fn run() -> Result<(), Error> { 56 - let conn = Connection::open("./rocksky-seed.ddb")?; 67 + let conn = Connection::open(DB_PATH)?; 57 68 let pool = PgPoolOptions::new() 58 69 .max_connections(5) 59 70 .connect(&env::var("XATA_POSTGRES_URL")?)
+57 -9
crates/feed/src/repo/duckdb/album.rs
··· 1 - use std::sync::Arc; 1 + use crate::{repo::duckdb::DB_PATH, types::AlbumRecord}; 2 + use anyhow::Error; 3 + use duckdb::params; 2 4 3 - use tokio::sync::Mutex; 5 + pub async fn save_album(uri: &str, record: AlbumRecord) -> Result<(), anyhow::Error> { 6 + let uri = uri.to_string(); 7 + tokio::task::spawn_blocking(move || -> Result<(), Error> { 8 + let conn = duckdb::Connection::open(DB_PATH)?; 4 9 5 - pub struct AlbumRepo { 6 - pub conn: Arc<Mutex<duckdb::Connection>>, 7 - } 10 + let album_hash = 11 + sha256::digest(format!("{} - {}", record.title, record.artist).to_lowercase()); 8 12 9 - impl AlbumRepo { 10 - pub fn new(conn: Arc<Mutex<duckdb::Connection>>) -> Self { 11 - Self { conn } 12 - } 13 + match conn.execute( 14 + "INSERT INTO albums ( 15 + id, 16 + title, 17 + artist, 18 + release_date, 19 + album_art, 20 + year, 21 + sha256, 22 + uri 23 + ) VALUES ( 24 + ?, 25 + ?, 26 + ?, 27 + ?, 28 + ?, 29 + ?, 30 + ?, 31 + ? 32 + ) ON CONFLICT (sha256) DO UPDATE SET 33 + uri = EXCLUDED.uri, 34 + year = EXCLUDED.year, 35 + album_art = EXCLUDED.album_art, 36 + release_date = EXCLUDED.release_date, 37 + artist = EXCLUDED.artist, 38 + title = EXCLUDED.title;", 39 + params![ 40 + xid::new().to_string(), 41 + record.title, 42 + record.artist, 43 + record.release_date, 44 + record.album_art_url, 45 + record.year, 46 + album_hash, 47 + uri 48 + ], 49 + ) { 50 + Ok(x) => tracing::info!("Album successfully inserted or updated: {}", x), 51 + Err(e) => tracing::error!(error = %e, "Error inserting/updating album"), 52 + } 53 + 54 + conn.close() 55 + .map_err(|(_, e)| Error::msg(format!("Error closing connection: {}", e)))?; 56 + Ok(()) 57 + }) 58 + .await??; 59 + 60 + Ok(()) 13 61 }
+55 -9
crates/feed/src/repo/duckdb/artist.rs
··· 1 - use std::sync::Arc; 1 + use crate::{repo::duckdb::DB_PATH, types::ArtistRecord}; 2 + use anyhow::Error; 3 + use duckdb::params; 2 4 3 - use tokio::sync::Mutex; 5 + pub async fn save_artist(uri: &str, record: ArtistRecord) -> Result<(), anyhow::Error> { 6 + let uri = uri.to_string(); 7 + tokio::task::spawn_blocking(move || -> Result<(), Error> { 8 + let conn = duckdb::Connection::open(DB_PATH)?; 4 9 5 - pub struct ArtistRepo { 6 - pub conn: Arc<Mutex<duckdb::Connection>>, 7 - } 10 + let artist_hash = sha256::digest(record.name.to_lowercase()); 11 + match conn.execute( 12 + &format!( 13 + "INSERT INTO artists ( 14 + id, 15 + name, 16 + picture, 17 + sha256, 18 + uri, 19 + tags 20 + ) VALUES ( 21 + ?, 22 + ?, 23 + ?, 24 + ?, 25 + ?, 26 + [{}] 27 + ) ON CONFLICT (sha256) DO UPDATE SET 28 + uri = EXCLUDED.uri, 29 + tags = EXCLUDED.tags;", 30 + record 31 + .tags 32 + .as_ref() 33 + .map(|tags| tags 34 + .iter() 35 + .map(|tag| format!("'{}'", tag)) 36 + .collect::<Vec<_>>() 37 + .join(", ")) 38 + .unwrap_or_default() 39 + ), 40 + params![ 41 + xid::new().to_string(), 42 + record.name, 43 + record.picture_url, 44 + artist_hash, 45 + uri 46 + ], 47 + ) { 48 + Ok(x) => tracing::info!("Artist successfully inserted or updated: {}", x), 49 + Err(e) => tracing::error!(error = %e, "Error inserting/updating artist"), 50 + } 8 51 9 - impl ArtistRepo { 10 - pub fn new(conn: Arc<Mutex<duckdb::Connection>>) -> Self { 11 - Self { conn } 12 - } 52 + conn.close() 53 + .map_err(|(_, e)| Error::msg(format!("Error closing connection: {}", e)))?; 54 + Ok(()) 55 + }) 56 + .await??; 57 + 58 + Ok(()) 13 59 }
+212 -23
crates/feed/src/repo/duckdb/mod.rs
··· 1 1 use std::sync::Arc; 2 2 3 - use crate::repo::duckdb::{ 4 - album::AlbumRepo, artist::ArtistRepo, scrobble::ScrobbleRepo, track::TrackRepo, 3 + use crate::{ 4 + repo::duckdb::{ 5 + album::save_album, artist::save_artist, scrobble::save_scrobble, track::save_track, 6 + user::save_user, 7 + }, 8 + types::{AlbumRecord, ArtistRecord, ScrobbleRecord, SongRecord}, 5 9 }; 6 10 7 11 use super::Repo; ··· 15 19 pub mod track; 16 20 pub mod user; 17 21 22 + pub const DB_PATH: &str = "./rocksky-feed.ddb"; 23 + 24 + #[derive(Clone)] 18 25 pub struct DuckdbRepo { 19 - pub album: AlbumRepo, 20 - pub atist: ArtistRepo, 21 - pub scrobble: ScrobbleRepo, 22 - pub track: TrackRepo, 26 + pub conn: Arc<Mutex<duckdb::Connection>>, 23 27 } 24 28 25 29 impl DuckdbRepo { 26 30 pub async fn new() -> Result<Self, Error> { 27 - let conn = duckdb::Connection::open("./rocksky-seed.ddb")?; 31 + let conn = duckdb::Connection::open(DB_PATH)?; 28 32 let conn = Arc::new(Mutex::new(conn)); 29 - Ok(Self { 30 - album: AlbumRepo::new(conn.clone()), 31 - atist: ArtistRepo::new(conn.clone()), 32 - scrobble: ScrobbleRepo::new(conn.clone()), 33 - track: TrackRepo::new(conn.clone()), 34 - }) 33 + Ok(Self { conn: conn.clone() }) 35 34 } 36 35 } 37 36 38 37 #[async_trait] 39 38 impl Repo for DuckdbRepo { 40 - async fn insert_album(self) -> Result<(), anyhow::Error> { 41 - todo!() 39 + async fn insert_album(self, uri: &str, record: AlbumRecord) -> Result<(), anyhow::Error> { 40 + save_album(uri, record).await 42 41 } 43 42 44 - async fn insert_artist(self) -> Result<(), anyhow::Error> { 45 - todo!() 43 + async fn insert_artist(self, uri: &str, record: ArtistRecord) -> Result<(), anyhow::Error> { 44 + save_artist(uri, record).await 46 45 } 47 46 48 - async fn insert_scrobble(self) -> Result<(), anyhow::Error> { 49 - todo!() 47 + async fn insert_scrobble( 48 + self, 49 + did: &str, 50 + uri: &str, 51 + record: ScrobbleRecord, 52 + ) -> Result<(), anyhow::Error> { 53 + save_scrobble(did, uri, record).await 50 54 } 51 55 52 - async fn insert_track(self) -> Result<(), anyhow::Error> { 53 - todo!() 56 + async fn insert_track(self, uri: &str, record: SongRecord) -> Result<(), anyhow::Error> { 57 + save_track(uri, record).await 54 58 } 55 59 56 - async fn insert_user(self) -> Result<(), anyhow::Error> { 57 - todo!() 60 + async fn insert_user(self, did: &str) -> Result<(), anyhow::Error> { 61 + save_user(did).await 58 62 } 59 63 60 64 async fn get_albums(self) -> Result<(), anyhow::Error> { ··· 91 95 92 96 async fn get_user(self) -> Result<(), anyhow::Error> { 93 97 todo!() 98 + } 99 + 100 + async fn create_tables(self) -> Result<(), anyhow::Error> { 101 + let conn = self.conn.lock().await; 102 + conn.execute_batch( 103 + "BEGIN; 104 + CREATE TABLE IF NOT EXISTS artists ( 105 + id VARCHAR PRIMARY KEY, 106 + name VARCHAR NOT NULL, 107 + biography TEXT, 108 + born DATE, 109 + born_in VARCHAR, 110 + died DATE, 111 + picture VARCHAR, 112 + sha256 VARCHAR UNIQUE NOT NULL, 113 + spotify_link VARCHAR, 114 + tidal_link VARCHAR, 115 + youtube_link VARCHAR, 116 + apple_music_link VARCHAR, 117 + uri VARCHAR UNIQUE, 118 + tags VARCHAR[], 119 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 120 + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 121 + ); 122 + CREATE TABLE IF NOT EXISTS albums ( 123 + id VARCHAR PRIMARY KEY, 124 + title VARCHAR NOT NULL, 125 + artist VARCHAR NOT NULL, 126 + release_date DATE, 127 + album_art VARCHAR, 128 + year INTEGER, 129 + spotify_link VARCHAR, 130 + tidal_link VARCHAR, 131 + youtube_link VARCHAR, 132 + apple_music_link VARCHAR, 133 + sha256 VARCHAR UNIQUE NOT NULL, 134 + uri VARCHAR UNIQUE, 135 + artist_uri VARCHAR, 136 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 137 + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 138 + ); 139 + CREATE TABLE IF NOT EXISTS tracks ( 140 + id VARCHAR PRIMARY KEY, 141 + title VARCHAR, 142 + artist VARCHAR, 143 + album_artist VARCHAR, 144 + album_art VARCHAR, 145 + album VARCHAR, 146 + track_number INTEGER, 147 + duration INTEGER, 148 + mb_id VARCHAR, 149 + youtube_link VARCHAR, 150 + spotify_link VARCHAR, 151 + tidal_link VARCHAR, 152 + apple_music_link VARCHAR, 153 + sha256 VARCHAR UNIQUE NOT NULL, 154 + lyrics TEXT, 155 + composer VARCHAR, 156 + genre VARCHAR, 157 + disc_number INTEGER, 158 + copyright_message VARCHAR, 159 + label VARCHAR, 160 + uri VARCHAR UNIQUE, 161 + artist_uri VARCHAR, 162 + album_uri VARCHAR, 163 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 164 + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 165 + ); 166 + CREATE TABLE IF NOT EXISTS album_tracks ( 167 + id VARCHAR PRIMARY KEY, 168 + album_id VARCHAR, 169 + track_id VARCHAR, 170 + FOREIGN KEY (album_id) REFERENCES albums(id), 171 + FOREIGN KEY (track_id) REFERENCES tracks(id), 172 + ); 173 + CREATE TABLE IF NOT EXISTS users ( 174 + id VARCHAR PRIMARY KEY, 175 + display_name VARCHAR, 176 + did VARCHAR UNIQUE NOT NULL, 177 + handle VARCHAR UNIQUE NOT NULL, 178 + avatar VARCHAR, 179 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 180 + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 181 + ); 182 + CREATE TABLE IF NOT EXISTS playlists ( 183 + id VARCHAR PRIMARY KEY, 184 + name VARCHAR NOT NULL, 185 + description TEXT, 186 + picture VARCHAR, 187 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 188 + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 189 + uri VARCHAR UNIQUE, 190 + created_by VARCHAR NOT NULL, 191 + FOREIGN KEY (created_by) REFERENCES users(id), 192 + ); 193 + CREATE TABLE IF NOT EXISTS playlist_tracks ( 194 + id VARCHAR PRIMARY KEY, 195 + playlist_id VARCHAR NOT NULL, 196 + track_id VARCHAR NOT NULL, 197 + added_by VARCHAR, 198 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 199 + FOREIGN KEY (playlist_id) REFERENCES playlists(id), 200 + FOREIGN KEY (track_id) REFERENCES tracks(id), 201 + ); 202 + CREATE TABLE IF NOT EXISTS user_tracks ( 203 + id VARCHAR PRIMARY KEY, 204 + user_id VARCHAR NOT NULL, 205 + track_id VARCHAR NOT NULL, 206 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 207 + FOREIGN KEY (user_id) REFERENCES users(id), 208 + FOREIGN KEY (track_id) REFERENCES tracks(id), 209 + ); 210 + CREATE TABLE IF NOT EXISTS user_albums ( 211 + id VARCHAR PRIMARY KEY, 212 + user_id VARCHAR, 213 + album_id VARCHAR, 214 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 215 + FOREIGN KEY (user_id) REFERENCES users(id), 216 + FOREIGN KEY (album_id) REFERENCES albums(id), 217 + ); 218 + CREATE TABLE IF NOT EXISTS user_artists ( 219 + id VARCHAR PRIMARY KEY, 220 + user_id VARCHAR NOT NULL, 221 + artist_id VARCHAR NOT NULL, 222 + created_at TIMESTAMP, 223 + FOREIGN KEY (user_id) REFERENCES users(id), 224 + FOREIGN KEY (artist_id) REFERENCES artists(id), 225 + ); 226 + CREATE TABLE IF NOT EXISTS user_playlists ( 227 + id VARCHAR PRIMARY KEY, 228 + user_id VARCHAR NOT NULL, 229 + playlist_id VARCHAR NOT NULL, 230 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 231 + FOREIGN KEY (user_id) REFERENCES users(id), 232 + FOREIGN KEY (playlist_id) REFERENCES playlists(id), 233 + ); 234 + CREATE TABLE IF NOT EXISTS loved_tracks ( 235 + id VARCHAR PRIMARY KEY, 236 + user_id VARCHAR NOT NULL, 237 + track_id VARCHAR NOT NULL, 238 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 239 + FOREIGN KEY (user_id) REFERENCES users(id), 240 + FOREIGN KEY (track_id) REFERENCES tracks(id), 241 + ); 242 + CREATE TABLE IF NOT EXISTS artist_tracks ( 243 + id VARCHAR PRIMARY KEY, 244 + artist_id VARCHAR NOT NULL, 245 + track_id VARCHAR NOT NULL, 246 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 247 + FOREIGN KEY (artist_id) REFERENCES artists(id), 248 + FOREIGN KEY (track_id) REFERENCES tracks(id), 249 + ); 250 + CREATE TABLE IF NOT EXISTS artist_albums ( 251 + id VARCHAR PRIMARY KEY, 252 + artist_id VARCHAR NOT NULL, 253 + album_id VARCHAR NOT NULL, 254 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 255 + FOREIGN KEY (artist_id) REFERENCES artists(id), 256 + FOREIGN KEY (album_id) REFERENCES albums(id), 257 + ); 258 + CREATE TABLE IF NOT EXISTS album_tracks ( 259 + id VARCHAR PRIMARY KEY, 260 + album_id VARCHAR NOT NULL, 261 + track_id VARCHAR NOT NULL, 262 + FOREIGN KEY (album_id) REFERENCES albums(id), 263 + FOREIGN KEY (track_id) REFERENCES tracks(id), 264 + ); 265 + CREATE TABLE IF NOT EXISTS scrobbles ( 266 + id VARCHAR PRIMARY KEY, 267 + user_id VARCHAR NOT NULL, 268 + track_id VARCHAR NOT NULL, 269 + album_id VARCHAR NOT NULL, 270 + artist_id VARCHAR NOT NULL, 271 + uri VARCHAR UNIQUE, 272 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 273 + FOREIGN KEY (user_id) REFERENCES users(id), 274 + FOREIGN KEY (track_id) REFERENCES tracks(id), 275 + FOREIGN KEY (album_id) REFERENCES albums(id), 276 + FOREIGN KEY (artist_id) REFERENCES artists(id), 277 + ); 278 + COMMIT; 279 + ", 280 + )?; 281 + 282 + Ok(()) 94 283 } 95 284 }
+327 -9
crates/feed/src/repo/duckdb/scrobble.rs
··· 1 - use std::sync::Arc; 1 + use anyhow::Error; 2 + use duckdb::{params, OptionalExt}; 2 3 3 - use tokio::sync::Mutex; 4 + use crate::{did::did_to_profile, repo::duckdb::DB_PATH, types::ScrobbleRecord}; 5 + 6 + pub async fn save_scrobble( 7 + did: &str, 8 + uri: &str, 9 + record: ScrobbleRecord, 10 + ) -> Result<(), anyhow::Error> { 11 + let did = did.to_string(); 12 + let cloned_did = did.clone(); 4 13 5 - pub struct ScrobbleRepo { 6 - pub conn: Arc<Mutex<duckdb::Connection>>, 7 - } 14 + let uri = uri.to_string(); 8 15 9 - impl ScrobbleRepo { 10 - pub fn new(conn: Arc<Mutex<duckdb::Connection>>) -> Self { 11 - Self { conn } 12 - } 16 + tokio::task::spawn_blocking(move || -> Result<(), Error> { 17 + let mut conn = duckdb::Connection::open(DB_PATH)?; 18 + let tx = conn.transaction()?; 19 + let mut user = tx.prepare("SELECT id FROM users WHERE did = ?")?; 20 + let user_id: Option<String> = user.query_row(params![did], |row| row.get(0)).optional()?; 21 + 22 + if user_id.is_none() { 23 + let rt = tokio::runtime::Runtime::new()?; 24 + let profile = rt.block_on(did_to_profile(&did))?; 25 + 26 + let avatar = profile.avatar.map(|blob| { 27 + format!( 28 + "https://cdn.bsky.app/img/avatar/plain/{}/{}@{}", 29 + did, 30 + blob.r#ref.link, 31 + blob.mime_type.split('/').last().unwrap_or("jpeg") 32 + ) 33 + }); 34 + 35 + tx.execute( 36 + "INSERT OR IGNORE INTO users ( 37 + id, 38 + display_name, 39 + did, 40 + handle, 41 + avatar 42 + ) VALUES ( 43 + ?, 44 + ?, 45 + ?, 46 + ?, 47 + ?)", 48 + params![ 49 + xid::new().to_string(), 50 + profile.display_name.unwrap_or_default(), 51 + did, 52 + profile.handle.unwrap_or_default(), 53 + avatar, 54 + ], 55 + )?; 56 + } 57 + 58 + let album_hash = 59 + sha256::digest(format!("{} - {}", record.album, record.album_artist).to_lowercase()); 60 + 61 + match tx.execute( 62 + "INSERT OR IGNORE INTO albums ( 63 + id, 64 + title, 65 + artist, 66 + release_date, 67 + album_art, 68 + year, 69 + sha256 70 + ) VALUES ( 71 + ?, 72 + ?, 73 + ?, 74 + ?, 75 + ?, 76 + ?, 77 + ? 78 + )", 79 + params![ 80 + xid::new().to_string(), 81 + record.album, 82 + record.album_artist, 83 + record.release_date, 84 + record.album_art_url, 85 + record.year, 86 + album_hash, 87 + ], 88 + ) { 89 + Ok(x) => tracing::info!("Album inserted or already exists {}", x), 90 + Err(e) => tracing::error!(error = %e, "Error inserting album"), 91 + } 92 + 93 + let artist_hash = sha256::digest(record.album_artist.to_lowercase()); 94 + match tx.execute( 95 + &format!( 96 + "INSERT OR IGNORE INTO artists ( 97 + id, 98 + name, 99 + sha256, 100 + tags 101 + ) VALUES ( 102 + ?, 103 + ?, 104 + ?, 105 + [{}] 106 + )", 107 + record 108 + .tags 109 + .as_ref() 110 + .map(|tags| tags 111 + .iter() 112 + .map(|tag| format!("'{}'", tag)) 113 + .collect::<Vec<_>>() 114 + .join(", ")) 115 + .unwrap_or_default() 116 + ), 117 + params![xid::new().to_string(), record.album_artist, artist_hash], 118 + ) { 119 + Ok(x) => tracing::info!("Artist inserted or already exists {}", x), 120 + Err(e) => tracing::error!(error = %e, "Error inserting artist"), 121 + } 122 + 123 + let track_hash = sha256::digest( 124 + format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(), 125 + ); 126 + match tx.execute( 127 + "INSERT OR IGNORE INTO tracks ( 128 + id, 129 + title, 130 + artist, 131 + album_artist, 132 + album_art, 133 + album, 134 + track_number, 135 + disc_number, 136 + spotify_link, 137 + tidal_link, 138 + youtube_link, 139 + apple_music_link, 140 + copyright_message, 141 + label, 142 + lyrics, 143 + composer, 144 + duration, 145 + mb_id, 146 + sha256 147 + ) VALUES ( 148 + ?, 149 + ?, 150 + ?, 151 + ?, 152 + ?, 153 + ?, 154 + ?, 155 + ?, 156 + ?, 157 + ?, 158 + ?, 159 + ?, 160 + ?, 161 + ?, 162 + ?, 163 + ?, 164 + ?, 165 + ?, 166 + ? 167 + )", 168 + params![ 169 + xid::new().to_string(), 170 + record.title, 171 + record.artist, 172 + record.album_artist, 173 + record.album_art_url, 174 + record.album, 175 + record.track_number, 176 + record.disc_number, 177 + record.spotify_link, 178 + record.tidal_link, 179 + record.youtube_link, 180 + record.apple_music_link, 181 + record.copyright_message, 182 + record.label, 183 + record.lyrics, 184 + record.composer, 185 + record.duration, 186 + record.mbid, 187 + track_hash, 188 + ], 189 + ) { 190 + Ok(x) => tracing::info!("Track inserted or already exists {}", x), 191 + Err(e) => tracing::error!(error = %e, "Error inserting track"), 192 + } 193 + 194 + match tx.execute( 195 + "INSERT OR IGNORE INTO album_tracks ( 196 + id, 197 + album_id, 198 + track_id 199 + ) VALUES ( 200 + ?, 201 + (SELECT id FROM albums WHERE sha256 = ?), 202 + (SELECT id FROM tracks WHERE sha256 = ?), 203 + )", 204 + params![xid::new().to_string(), album_hash, track_hash], 205 + ) { 206 + Ok(x) => tracing::info!("Album-Track relation inserted or already exists {}", x), 207 + Err(e) => tracing::error!(error = %e, "Error inserting album-track relation"), 208 + } 209 + 210 + match tx.execute( 211 + "INSERT OR IGNORE INTO user_artists ( 212 + id, 213 + user_id, 214 + artist_id 215 + ) VALUES ( 216 + ?, 217 + (SELECT id FROM users WHERE did = ?), 218 + (SELECT id FROM artists WHERE sha256 = ?), 219 + )", 220 + params![xid::new().to_string(), cloned_did, artist_hash], 221 + ) { 222 + Ok(x) => tracing::info!("User-Artist relation inserted or already exists {}", x), 223 + Err(e) => tracing::error!(error = %e, "Error inserting user-artist relation"), 224 + } 225 + 226 + match tx.execute( 227 + "INSERT OR IGNORE INTO user_albums ( 228 + id, 229 + user_id, 230 + album_id 231 + ) VALUES ( 232 + ?, 233 + (SELECT id FROM users WHERE did = ?), 234 + (SELECT id FROM albums WHERE sha256 = ?), 235 + )", 236 + params![xid::new().to_string(), cloned_did, album_hash], 237 + ) { 238 + Ok(x) => tracing::info!("User-Album relation inserted or already exists {}", x), 239 + Err(e) => tracing::error!(error = %e, "Error inserting user-album relation"), 240 + } 241 + 242 + match tx.execute( 243 + "INSERT OR IGNORE INTO user_tracks ( 244 + id, 245 + user_id, 246 + track_id 247 + ) VALUES ( 248 + ?, 249 + (SELECT id FROM users WHERE did = ?), 250 + (SELECT id FROM tracks WHERE sha256 = ?), 251 + )", 252 + params![xid::new().to_string(), cloned_did, track_hash], 253 + ) { 254 + Ok(x) => tracing::info!("User-Track relation inserted or already exists {}", x), 255 + Err(e) => tracing::error!(error = %e, "Error inserting user-track relation"), 256 + } 257 + 258 + match tx.execute( 259 + "INSERT OR IGNORE INTO artist_albums ( 260 + id, 261 + artist_id, 262 + album_id 263 + ) VALUES ( 264 + ?, 265 + (SELECT id FROM artists WHERE sha256 = ?), 266 + (SELECT id FROM albums WHERE sha256 = ?), 267 + )", 268 + params![xid::new().to_string(), artist_hash, album_hash], 269 + ) { 270 + Ok(x) => tracing::info!("Artist-Album relation inserted or already exists {}", x), 271 + Err(e) => tracing::error!(error = %e, "Error inserting artist-album relation"), 272 + } 273 + 274 + match tx.execute( 275 + "INSERT OR IGNORE INTO artist_tracks ( 276 + id, 277 + artist_id, 278 + track_id 279 + ) VALUES ( 280 + ?, 281 + (SELECT id FROM artists WHERE sha256 = ?), 282 + (SELECT id FROM tracks WHERE sha256 = ?), 283 + )", 284 + params![xid::new().to_string(), artist_hash, track_hash], 285 + ) { 286 + Ok(x) => tracing::info!("Artist-Track relation inserted or already exists {}", x), 287 + Err(e) => tracing::error!(error = %e, "Error inserting artist-track relation"), 288 + } 289 + 290 + match tx.execute( 291 + "INSERT OR IGNORE INTO scrobbles ( 292 + id, 293 + user_id, 294 + track_id, 295 + album_id, 296 + artist_id, 297 + uri, 298 + created_at 299 + ) VALUES ( 300 + ?, 301 + (SELECT id FROM users WHERE did = ?), 302 + (SELECT id FROM tracks WHERE sha256 = ?), 303 + (SELECT id FROM albums WHERE sha256 = ?), 304 + (SELECT id FROM artists WHERE sha256 = ?), 305 + ?, 306 + CURRENT_TIMESTAMP, 307 + )", 308 + params![ 309 + xid::new().to_string(), 310 + cloned_did, 311 + track_hash, 312 + album_hash, 313 + artist_hash, 314 + uri, 315 + ], 316 + ) { 317 + Ok(x) => tracing::info!("Scrobble inserted {}", x), 318 + Err(e) => tracing::error!(error = %e, "Error inserting scrobble"), 319 + } 320 + 321 + tx.commit()?; 322 + 323 + conn.close() 324 + .map_err(|(_, e)| Error::msg(format!("Error closing connection: {}", e)))?; 325 + 326 + Ok(()) 327 + }) 328 + .await??; 329 + 330 + Ok(()) 13 331 }
+89 -9
crates/feed/src/repo/duckdb/track.rs
··· 1 - use std::sync::Arc; 1 + use crate::{repo::duckdb::DB_PATH, types::SongRecord}; 2 + use anyhow::Error; 3 + use duckdb::params; 2 4 3 - use tokio::sync::Mutex; 5 + pub async fn save_track(uri: &str, record: SongRecord) -> Result<(), anyhow::Error> { 6 + let uri = uri.to_string(); 7 + tokio::task::spawn_blocking(move || -> Result<(), Error> { 8 + let conn = duckdb::Connection::open(DB_PATH)?; 9 + let track_hash = sha256::digest( 10 + format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(), 11 + ); 4 12 5 - pub struct TrackRepo { 6 - pub conn: Arc<Mutex<duckdb::Connection>>, 7 - } 13 + match conn.execute( 14 + "INSERT INTO tracks ( 15 + id, 16 + title, 17 + artist, 18 + album_artist, 19 + album_art, 20 + album, 21 + track_number, 22 + disc_number, 23 + spotify_link, 24 + tidal_link, 25 + youtube_link, 26 + apple_music_link, 27 + copyright_message, 28 + label, 29 + lyrics, 30 + composer, 31 + duration, 32 + mb_id, 33 + sha256, 34 + uri 35 + ) VALUES ( 36 + ?, 37 + ?, 38 + ?, 39 + ?, 40 + ?, 41 + ?, 42 + ?, 43 + ?, 44 + ?, 45 + ?, 46 + ?, 47 + ?, 48 + ?, 49 + ?, 50 + ?, 51 + ?, 52 + ?, 53 + ?, 54 + ?, 55 + ? 56 + ) ON CONFLICT (sha256) DO UPDATE SET 57 + uri = EXCLUDED.uri; 58 + ", 59 + params![ 60 + xid::new().to_string(), 61 + record.title, 62 + record.artist, 63 + record.album_artist, 64 + record.album_art_url, 65 + record.album, 66 + record.track_number, 67 + record.disc_number, 68 + record.spotify_link, 69 + record.tidal_link, 70 + record.youtube_link, 71 + record.apple_music_link, 72 + record.copyright_message, 73 + record.label, 74 + record.lyrics, 75 + record.composer, 76 + record.duration, 77 + record.mbid, 78 + track_hash, 79 + uri 80 + ], 81 + ) { 82 + Ok(x) => tracing::info!("Track successfully inserted or updated: {}", x), 83 + Err(e) => tracing::error!(error = %e, "Error inserting/updating track"), 84 + } 8 85 9 - impl TrackRepo { 10 - pub fn new(conn: Arc<Mutex<duckdb::Connection>>) -> Self { 11 - Self { conn } 12 - } 86 + conn.close() 87 + .map_err(|(_, e)| Error::msg(format!("Error closing connection: {}", e)))?; 88 + Ok(()) 89 + }) 90 + .await??; 91 + 92 + Ok(()) 13 93 }
+2 -12
crates/feed/src/repo/duckdb/user.rs
··· 1 - use std::sync::Arc; 2 - 3 - use tokio::sync::Mutex; 4 - 5 - pub struct UserRepo { 6 - pub conn: Arc<Mutex<duckdb::Connection>>, 7 - } 8 - 9 - impl UserRepo { 10 - pub fn new(conn: Arc<Mutex<duckdb::Connection>>) -> Self { 11 - Self { conn } 12 - } 1 + pub async fn save_user(_did: &str) -> Result<(), anyhow::Error> { 2 + todo!() 13 3 }
+137 -6
crates/feed/src/repo/mod.rs
··· 1 1 use anyhow::Error; 2 2 use async_trait::async_trait; 3 3 4 + use crate::{ 5 + repo::{duckdb::DuckdbRepo, postgres::PostgresRepo}, 6 + types::{AlbumRecord, ArtistRecord, ScrobbleRecord, SongRecord}, 7 + }; 8 + 4 9 pub mod duckdb; 5 10 pub mod postgres; 6 11 7 12 #[async_trait] 8 - pub trait Repo { 9 - async fn insert_album(self) -> Result<(), Error>; 10 - async fn insert_artist(self) -> Result<(), Error>; 11 - async fn insert_scrobble(self) -> Result<(), Error>; 12 - async fn insert_track(self) -> Result<(), Error>; 13 - async fn insert_user(self) -> Result<(), Error>; 13 + pub trait Repo: Send + Sync + Clone { 14 + async fn insert_album(self, uri: &str, record: AlbumRecord) -> Result<(), Error>; 15 + async fn insert_artist(self, uri: &str, record: ArtistRecord) -> Result<(), Error>; 16 + async fn insert_scrobble( 17 + self, 18 + did: &str, 19 + uri: &str, 20 + record: ScrobbleRecord, 21 + ) -> Result<(), Error>; 22 + async fn insert_track(self, uri: &str, record: SongRecord) -> Result<(), Error>; 23 + async fn insert_user(self, did: &str) -> Result<(), Error>; 14 24 async fn get_albums(self) -> Result<(), Error>; 15 25 async fn get_artists(self) -> Result<(), Error>; 16 26 async fn get_scrobbles(self) -> Result<(), Error>; ··· 20 30 async fn get_artist(self) -> Result<(), Error>; 21 31 async fn get_track(self) -> Result<(), Error>; 22 32 async fn get_user(self) -> Result<(), Error>; 33 + async fn create_tables(self) -> Result<(), Error>; 34 + } 35 + 36 + pub enum RepoImpl { 37 + Duckdb(DuckdbRepo), 38 + Postgres(PostgresRepo), 39 + } 40 + 41 + impl Clone for RepoImpl { 42 + fn clone(&self) -> Self { 43 + match self { 44 + RepoImpl::Duckdb(repo) => RepoImpl::Duckdb(repo.clone()), 45 + RepoImpl::Postgres(repo) => RepoImpl::Postgres(repo.clone()), 46 + } 47 + } 48 + } 49 + 50 + #[async_trait] 51 + impl Repo for RepoImpl { 52 + async fn insert_album(self, uri: &str, record: AlbumRecord) -> Result<(), Error> { 53 + match self { 54 + RepoImpl::Duckdb(repo) => repo.insert_album(uri, record).await, 55 + RepoImpl::Postgres(repo) => repo.insert_album(uri, record).await, 56 + } 57 + } 58 + 59 + async fn insert_artist(self, uri: &str, record: ArtistRecord) -> Result<(), Error> { 60 + match self { 61 + RepoImpl::Duckdb(repo) => repo.insert_artist(uri, record).await, 62 + RepoImpl::Postgres(repo) => repo.insert_artist(uri, record).await, 63 + } 64 + } 65 + 66 + async fn insert_scrobble( 67 + self, 68 + did: &str, 69 + uri: &str, 70 + record: ScrobbleRecord, 71 + ) -> Result<(), Error> { 72 + match self { 73 + RepoImpl::Duckdb(repo) => repo.insert_scrobble(did, uri, record).await, 74 + RepoImpl::Postgres(repo) => repo.insert_scrobble(did, uri, record).await, 75 + } 76 + } 77 + 78 + async fn insert_track(self, uri: &str, record: SongRecord) -> Result<(), Error> { 79 + match self { 80 + RepoImpl::Duckdb(repo) => repo.insert_track(uri, record).await, 81 + RepoImpl::Postgres(repo) => repo.insert_track(uri, record).await, 82 + } 83 + } 84 + 85 + async fn insert_user(self, did: &str) -> Result<(), Error> { 86 + match self { 87 + RepoImpl::Duckdb(repo) => repo.insert_user(did).await, 88 + RepoImpl::Postgres(repo) => repo.insert_user(did).await, 89 + } 90 + } 91 + 92 + async fn get_albums(self) -> Result<(), Error> { 93 + match self { 94 + RepoImpl::Duckdb(repo) => repo.get_albums().await, 95 + RepoImpl::Postgres(repo) => repo.get_albums().await, 96 + } 97 + } 98 + 99 + async fn get_artists(self) -> Result<(), Error> { 100 + match self { 101 + RepoImpl::Duckdb(repo) => repo.get_artists().await, 102 + RepoImpl::Postgres(repo) => repo.get_artists().await, 103 + } 104 + } 105 + 106 + async fn get_scrobbles(self) -> Result<(), Error> { 107 + match self { 108 + RepoImpl::Duckdb(repo) => repo.get_scrobbles().await, 109 + RepoImpl::Postgres(repo) => repo.get_scrobbles().await, 110 + } 111 + } 112 + async fn get_tracks(self) -> Result<(), Error> { 113 + match self { 114 + RepoImpl::Duckdb(repo) => repo.get_tracks().await, 115 + RepoImpl::Postgres(repo) => repo.get_tracks().await, 116 + } 117 + } 118 + async fn get_users(self) -> Result<(), Error> { 119 + match self { 120 + RepoImpl::Duckdb(repo) => repo.get_users().await, 121 + RepoImpl::Postgres(repo) => repo.get_users().await, 122 + } 123 + } 124 + async fn get_album(self) -> Result<(), Error> { 125 + match self { 126 + RepoImpl::Duckdb(repo) => repo.get_album().await, 127 + RepoImpl::Postgres(repo) => repo.get_album().await, 128 + } 129 + } 130 + async fn get_artist(self) -> Result<(), Error> { 131 + match self { 132 + RepoImpl::Duckdb(repo) => repo.get_artist().await, 133 + RepoImpl::Postgres(repo) => repo.get_artist().await, 134 + } 135 + } 136 + async fn get_track(self) -> Result<(), Error> { 137 + match self { 138 + RepoImpl::Duckdb(repo) => repo.get_track().await, 139 + RepoImpl::Postgres(repo) => repo.get_track().await, 140 + } 141 + } 142 + async fn get_user(self) -> Result<(), Error> { 143 + match self { 144 + RepoImpl::Duckdb(repo) => repo.get_user().await, 145 + RepoImpl::Postgres(repo) => repo.get_user().await, 146 + } 147 + } 148 + async fn create_tables(self) -> Result<(), Error> { 149 + match self { 150 + RepoImpl::Duckdb(repo) => repo.create_tables().await, 151 + RepoImpl::Postgres(repo) => repo.create_tables().await, 152 + } 153 + } 23 154 }
+11
crates/feed/src/repo/postgres/album.rs
··· 3 3 use sqlx::{Pool, Postgres}; 4 4 use tokio::sync::Mutex; 5 5 6 + use crate::types::AlbumRecord; 7 + 8 + #[derive(Clone)] 6 9 pub struct AlbumRepo { 7 10 pub pool: Arc<Mutex<Pool<Postgres>>>, 8 11 } ··· 10 13 impl AlbumRepo { 11 14 pub fn new(pool: Arc<Mutex<Pool<Postgres>>>) -> Self { 12 15 Self { pool } 16 + } 17 + 18 + pub async fn save_album(&self, _uri: &str, _record: AlbumRecord) -> Result<(), anyhow::Error> { 19 + todo!() 20 + } 21 + 22 + pub async fn get_albums(&self) -> Result<Vec<AlbumRecord>, anyhow::Error> { 23 + todo!() 13 24 } 14 25 }
+15
crates/feed/src/repo/postgres/artist.rs
··· 3 3 use sqlx::{Pool, Postgres}; 4 4 use tokio::sync::Mutex; 5 5 6 + use crate::{types::ArtistRecord, xata::artist::Artist}; 7 + 8 + #[derive(Clone)] 6 9 pub struct ArtistRepo { 7 10 pub pool: Arc<Mutex<Pool<Postgres>>>, 8 11 } ··· 10 13 impl ArtistRepo { 11 14 pub fn new(pool: Arc<Mutex<Pool<Postgres>>>) -> Self { 12 15 Self { pool } 16 + } 17 + 18 + pub async fn save_artist( 19 + &self, 20 + _uri: &str, 21 + _record: ArtistRecord, 22 + ) -> Result<(), anyhow::Error> { 23 + todo!() 24 + } 25 + 26 + pub async fn get_artists(&self) -> Result<Vec<Artist>, anyhow::Error> { 27 + todo!() 13 28 } 14 29 }
+26 -12
crates/feed/src/repo/postgres/mod.rs
··· 5 5 use sqlx::postgres::PgPoolOptions; 6 6 use tokio::sync::Mutex; 7 7 8 - use crate::repo::postgres::{ 9 - album::AlbumRepo, artist::ArtistRepo, scrobble::ScrobbleRepo, track::TrackRepo, user::UserRepo, 8 + use crate::{ 9 + repo::postgres::{ 10 + album::AlbumRepo, artist::ArtistRepo, scrobble::ScrobbleRepo, track::TrackRepo, 11 + user::UserRepo, 12 + }, 13 + types::{AlbumRecord, ArtistRecord, ScrobbleRecord, SongRecord}, 10 14 }; 11 15 12 16 use super::Repo; ··· 17 21 pub mod track; 18 22 pub mod user; 19 23 24 + #[derive(Clone)] 20 25 pub struct PostgresRepo { 21 26 pub album: AlbumRepo, 22 27 pub artist: ArtistRepo, ··· 44 49 45 50 #[async_trait] 46 51 impl Repo for PostgresRepo { 47 - async fn insert_album(self) -> Result<(), anyhow::Error> { 48 - todo!() 52 + async fn insert_album(self, uri: &str, record: AlbumRecord) -> Result<(), anyhow::Error> { 53 + self.album.save_album(uri, record).await 49 54 } 50 55 51 - async fn insert_artist(self) -> Result<(), anyhow::Error> { 52 - todo!() 56 + async fn insert_artist(self, uri: &str, record: ArtistRecord) -> Result<(), anyhow::Error> { 57 + self.artist.save_artist(uri, record).await 53 58 } 54 59 55 - async fn insert_scrobble(self) -> Result<(), anyhow::Error> { 56 - todo!() 60 + async fn insert_scrobble( 61 + self, 62 + did: &str, 63 + uri: &str, 64 + record: ScrobbleRecord, 65 + ) -> Result<(), anyhow::Error> { 66 + self.scrobble.save_scrobble(did, uri, record).await 57 67 } 58 68 59 - async fn insert_track(self) -> Result<(), anyhow::Error> { 60 - todo!() 69 + async fn insert_track(self, uri: &str, record: SongRecord) -> Result<(), anyhow::Error> { 70 + self.track.save_track(uri, record).await 61 71 } 62 72 63 - async fn insert_user(self) -> Result<(), anyhow::Error> { 64 - todo!() 73 + async fn insert_user(self, did: &str) -> Result<(), anyhow::Error> { 74 + self.user.save_user(did).await 65 75 } 66 76 67 77 async fn get_albums(self) -> Result<(), anyhow::Error> { ··· 97 107 } 98 108 99 109 async fn get_user(self) -> Result<(), anyhow::Error> { 110 + todo!() 111 + } 112 + 113 + async fn create_tables(self) -> Result<(), Error> { 100 114 todo!() 101 115 } 102 116 }
+16
crates/feed/src/repo/postgres/scrobble.rs
··· 3 3 use sqlx::{Pool, Postgres}; 4 4 use tokio::sync::Mutex; 5 5 6 + use crate::{types::ScrobbleRecord, xata::scrobble::Scrobble}; 7 + 8 + #[derive(Clone)] 6 9 pub struct ScrobbleRepo { 7 10 pub pool: Arc<Mutex<Pool<Postgres>>>, 8 11 } ··· 10 13 impl ScrobbleRepo { 11 14 pub fn new(pool: Arc<Mutex<Pool<Postgres>>>) -> Self { 12 15 Self { pool } 16 + } 17 + 18 + pub async fn save_scrobble( 19 + &self, 20 + _did: &str, 21 + _uri: &str, 22 + _record: ScrobbleRecord, 23 + ) -> Result<(), anyhow::Error> { 24 + todo!() 25 + } 26 + 27 + pub async fn get_scrobbles(&self) -> Result<Vec<Scrobble>, anyhow::Error> { 28 + todo!() 13 29 } 14 30 }
+7
crates/feed/src/repo/postgres/track.rs
··· 3 3 use sqlx::{Pool, Postgres}; 4 4 use tokio::sync::Mutex; 5 5 6 + use crate::types::SongRecord; 7 + 8 + #[derive(Clone)] 6 9 pub struct TrackRepo { 7 10 pub pool: Arc<Mutex<Pool<Postgres>>>, 8 11 } ··· 10 13 impl TrackRepo { 11 14 pub fn new(pool: Arc<Mutex<Pool<Postgres>>>) -> Self { 12 15 Self { pool } 16 + } 17 + 18 + pub async fn save_track(&self, _uri: &str, _record: SongRecord) -> Result<(), anyhow::Error> { 19 + todo!() 13 20 } 14 21 }
+5
crates/feed/src/repo/postgres/user.rs
··· 3 3 use sqlx::{Pool, Postgres}; 4 4 use tokio::sync::Mutex; 5 5 6 + #[derive(Clone)] 6 7 pub struct UserRepo { 7 8 pub pool: Arc<Mutex<Pool<Postgres>>>, 8 9 } ··· 10 11 impl UserRepo { 11 12 pub fn new(pool: Arc<Mutex<Pool<Postgres>>>) -> Self { 12 13 Self { pool } 14 + } 15 + 16 + pub async fn save_user(&self, _did: &str) -> Result<(), anyhow::Error> { 17 + todo!() 13 18 } 14 19 }
+55 -17
crates/feed/src/subscriber.rs
··· 1 - use std::{env, sync::Arc}; 2 - 3 - use anyhow::{Context, Error}; 1 + use anyhow::Error; 4 2 use owo_colors::OwoColorize; 5 - use sqlx::postgres::PgPoolOptions; 6 - use tokio::sync::Mutex; 7 3 use tokio_stream::StreamExt; 8 4 use tokio_tungstenite::connect_async; 9 5 use tungstenite::Message; 10 6 11 - use crate::types::Root; 7 + use crate::{ 8 + repo::{duckdb::DuckdbRepo, Repo, RepoImpl}, 9 + types::{AlbumRecord, ArtistRecord, Commit, Root, ScrobbleRecord, SongRecord}, 10 + ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID, 11 + }; 12 12 13 13 pub struct ScrobbleSubscriber { 14 14 pub service_url: String, ··· 22 22 } 23 23 24 24 pub async fn run(&self) -> Result<(), Error> { 25 - let db_url = env::var("XATA_POSTGRES_URL") 26 - .context("Failed to get XATA_POSTGRES_URL environment variable")?; 27 - 28 - let pool = PgPoolOptions::new() 29 - .max_connections(5) 30 - .connect(&db_url) 31 - .await?; 32 - let pool = Arc::new(Mutex::new(pool)); 33 - 34 25 let (mut ws_stream, _) = connect_async(&self.service_url).await?; 35 26 tracing::info!(url = %self.service_url.bright_green(), "Connected to jetstream at"); 27 + let ddb = DuckdbRepo::new().await?; 28 + ddb.clone().create_tables().await?; 29 + let ddb = RepoImpl::Duckdb(ddb); 36 30 37 31 while let Some(msg) = ws_stream.next().await { 38 32 match msg { 39 33 Ok(msg) => { 40 - if let Err(e) = handle_message(pool.clone(), msg) { 34 + if let Err(e) = handle_message(&ddb, msg) { 41 35 tracing::error!(error = %e, "Error handling message"); 42 36 } 43 37 } ··· 52 46 } 53 47 } 54 48 55 - fn handle_message(_pool: Arc<Mutex<sqlx::PgPool>>, msg: Message) -> Result<(), Error> { 49 + fn handle_message(ddb: &RepoImpl, msg: Message) -> Result<(), Error> { 50 + let ddb = ddb.clone(); 56 51 tokio::spawn(async move { 57 52 if let Message::Text(text) = msg { 58 53 let message: Root = serde_json::from_str(&text)?; ··· 62 57 } 63 58 64 59 tracing::info!(message = %text, "Received message"); 60 + 61 + if let Some(commit) = message.commit { 62 + match commit.operation.as_str() { 63 + "create" => save_scrobble(ddb, &message.did, commit).await?, 64 + _ => tracing::warn!(operation = %commit.operation, "Unknown operation"), 65 + } 66 + } 65 67 } 66 68 Ok::<(), Error>(()) 67 69 }); 68 70 Ok(()) 69 71 } 72 + 73 + async fn save_scrobble(ddb: RepoImpl, did: &str, commit: Commit) -> Result<(), Error> { 74 + if !vec![SCROBBLE_NSID, ARTIST_NSID, ALBUM_NSID, SONG_NSID] 75 + .contains(&commit.collection.as_str()) 76 + { 77 + return Ok(()); 78 + } 79 + 80 + match commit.collection.as_str() { 81 + SCROBBLE_NSID => { 82 + let record = serde_json::from_value::<ScrobbleRecord>(commit.record)?; 83 + let uri = format!("at://{}/app.rocksky.scrobble/{}", did, commit.rkey); 84 + ddb.insert_scrobble(did, &uri, record.clone()).await?; 85 + } 86 + ARTIST_NSID => { 87 + let record = serde_json::from_value::<ArtistRecord>(commit.record)?; 88 + let uri = format!("at://{}/app.rocksky.artist/{}", did, commit.rkey); 89 + ddb.insert_artist(&uri, record).await?; 90 + } 91 + ALBUM_NSID => { 92 + let record = serde_json::from_value::<AlbumRecord>(commit.record)?; 93 + let uri = format!("at://{}/app.rocksky.album/{}", did, commit.rkey); 94 + ddb.insert_album(&uri, record).await?; 95 + } 96 + SONG_NSID => { 97 + let record = serde_json::from_value::<SongRecord>(commit.record)?; 98 + let uri = format!("at://{}/app.rocksky.song/{}", did, commit.rkey); 99 + ddb.insert_track(&uri, record).await?; 100 + } 101 + _ => { 102 + tracing::warn!(collection = %commit.collection, "Unknown collection"); 103 + } 104 + } 105 + 106 + Ok(()) 107 + }
+19
crates/feed/src/sync.rs
··· 1 + use std::env; 2 + 3 + use anyhow::Error; 4 + use sqlx::postgres::PgPoolOptions; 5 + 6 + use crate::repo::{duckdb::DuckdbRepo, Repo, RepoImpl}; 7 + 8 + pub async fn sync_scrobbles() -> Result<(), Error> { 9 + tracing::info!("Starting scrobble synchronization..."); 10 + let pool = PgPoolOptions::new() 11 + .max_connections(5) 12 + .connect(&env::var("XATA_POSTGRES_URL")?) 13 + .await?; 14 + 15 + let repo = RepoImpl::Duckdb(DuckdbRepo::new().await?); 16 + repo.create_tables().await?; 17 + 18 + Ok(()) 19 + }
+185 -2
crates/feed/src/types.rs
··· 1 + use serde::{Deserialize, Serialize}; 1 2 use serde_json::Value; 2 3 3 4 #[derive(Debug, Clone)] ··· 24 25 25 26 #[derive(Debug, Clone)] 26 27 pub struct Scrobble {} 27 - 28 - use serde::{Deserialize, Serialize}; 29 28 30 29 #[derive(Serialize)] 31 30 pub(crate) struct DidDocument { ··· 74 73 pub cursor: Option<String>, 75 74 pub feed: Vec<SkeletonFeedScrobbleData>, 76 75 } 76 + 77 + // 78 + // Jetstream types 79 + // 80 + 81 + #[derive(Debug, Deserialize, Clone)] 82 + #[serde(rename_all = "camelCase")] 83 + pub struct ScrobbleRecord { 84 + #[serde(skip_serializing_if = "Option::is_none")] 85 + pub track_number: Option<i32>, 86 + #[serde(skip_serializing_if = "Option::is_none")] 87 + pub disc_number: Option<i32>, 88 + pub title: String, 89 + pub artist: String, 90 + pub album_artist: String, 91 + pub album: String, 92 + pub duration: i32, 93 + #[serde(skip_serializing_if = "Option::is_none")] 94 + pub release_date: Option<String>, 95 + #[serde(skip_serializing_if = "Option::is_none")] 96 + pub year: Option<i32>, 97 + #[serde(skip_serializing_if = "Option::is_none")] 98 + pub genre: Option<String>, 99 + #[serde(skip_serializing_if = "Option::is_none")] 100 + pub tags: Option<Vec<String>>, 101 + #[serde(skip_serializing_if = "Option::is_none")] 102 + pub composer: Option<String>, 103 + #[serde(skip_serializing_if = "Option::is_none")] 104 + pub lyrics: Option<String>, 105 + #[serde(skip_serializing_if = "Option::is_none")] 106 + pub copyright_message: Option<String>, 107 + #[serde(skip_serializing_if = "Option::is_none")] 108 + pub wiki: Option<String>, 109 + #[serde(skip_serializing_if = "Option::is_none")] 110 + pub album_art_url: Option<String>, 111 + #[serde(skip_serializing_if = "Option::is_none")] 112 + pub youtube_link: Option<String>, 113 + #[serde(skip_serializing_if = "Option::is_none")] 114 + pub spotify_link: Option<String>, 115 + #[serde(skip_serializing_if = "Option::is_none")] 116 + pub tidal_link: Option<String>, 117 + #[serde(skip_serializing_if = "Option::is_none")] 118 + pub apple_music_link: Option<String>, 119 + pub created_at: String, 120 + #[serde(skip_serializing_if = "Option::is_none")] 121 + pub label: Option<String>, 122 + #[serde(skip_serializing_if = "Option::is_none")] 123 + pub mbid: Option<String>, 124 + } 125 + 126 + #[derive(Debug, Deserialize, Clone)] 127 + #[serde(rename_all = "camelCase")] 128 + pub struct ArtistRecord { 129 + pub name: String, 130 + #[serde(skip_serializing_if = "Option::is_none")] 131 + pub bio: Option<String>, 132 + #[serde(skip_serializing_if = "Option::is_none")] 133 + pub picture_url: Option<String>, 134 + #[serde(skip_serializing_if = "Option::is_none")] 135 + pub tags: Option<Vec<String>>, 136 + #[serde(skip_serializing_if = "Option::is_none")] 137 + pub born: Option<String>, 138 + #[serde(skip_serializing_if = "Option::is_none")] 139 + pub died: Option<String>, 140 + #[serde(skip_serializing_if = "Option::is_none")] 141 + pub born_in: Option<String>, 142 + pub created_at: String, 143 + } 144 + 145 + #[derive(Debug, Deserialize, Clone)] 146 + #[serde(rename_all = "camelCase")] 147 + pub struct AlbumRecord { 148 + pub title: String, 149 + pub artist: String, 150 + #[serde(skip_serializing_if = "Option::is_none")] 151 + pub duration: Option<i32>, 152 + #[serde(skip_serializing_if = "Option::is_none")] 153 + pub release_date: Option<String>, 154 + #[serde(skip_serializing_if = "Option::is_none")] 155 + pub year: Option<i32>, 156 + #[serde(skip_serializing_if = "Option::is_none")] 157 + pub genre: Option<String>, 158 + #[serde(skip_serializing_if = "Option::is_none")] 159 + pub album_art_url: Option<String>, 160 + #[serde(skip_serializing_if = "Option::is_none")] 161 + pub tags: Option<Vec<String>>, 162 + #[serde(skip_serializing_if = "Option::is_none")] 163 + pub youtube_link: Option<String>, 164 + #[serde(skip_serializing_if = "Option::is_none")] 165 + pub spotify_link: Option<String>, 166 + #[serde(skip_serializing_if = "Option::is_none")] 167 + pub tidal_link: Option<String>, 168 + #[serde(skip_serializing_if = "Option::is_none")] 169 + pub apple_music_link: Option<String>, 170 + pub created_at: String, 171 + } 172 + 173 + #[derive(Debug, Deserialize, Clone)] 174 + #[serde(rename_all = "camelCase")] 175 + pub struct SongRecord { 176 + pub title: String, 177 + pub artist: String, 178 + pub album: String, 179 + pub album_artist: String, 180 + pub duration: i32, 181 + pub created_at: String, 182 + #[serde(skip_serializing_if = "Option::is_none")] 183 + pub track_number: Option<i32>, 184 + #[serde(skip_serializing_if = "Option::is_none")] 185 + pub disc_number: Option<i32>, 186 + #[serde(skip_serializing_if = "Option::is_none")] 187 + pub genre: Option<String>, 188 + #[serde(skip_serializing_if = "Option::is_none")] 189 + pub release_date: Option<String>, 190 + #[serde(skip_serializing_if = "Option::is_none")] 191 + pub year: Option<i32>, 192 + #[serde(skip_serializing_if = "Option::is_none")] 193 + pub tags: Option<Vec<String>>, 194 + #[serde(skip_serializing_if = "Option::is_none")] 195 + pub composer: Option<String>, 196 + #[serde(skip_serializing_if = "Option::is_none")] 197 + pub lyrics: Option<String>, 198 + #[serde(skip_serializing_if = "Option::is_none")] 199 + pub copyright_message: Option<String>, 200 + #[serde(skip_serializing_if = "Option::is_none")] 201 + pub wiki: Option<String>, 202 + #[serde(skip_serializing_if = "Option::is_none")] 203 + pub album_art_url: Option<String>, 204 + #[serde(skip_serializing_if = "Option::is_none")] 205 + pub youtube_link: Option<String>, 206 + #[serde(skip_serializing_if = "Option::is_none")] 207 + pub spotify_link: Option<String>, 208 + #[serde(skip_serializing_if = "Option::is_none")] 209 + pub tidal_link: Option<String>, 210 + #[serde(skip_serializing_if = "Option::is_none")] 211 + pub apple_music_link: Option<String>, 212 + #[serde(skip_serializing_if = "Option::is_none")] 213 + pub label: Option<String>, 214 + #[serde(skip_serializing_if = "Option::is_none")] 215 + pub mbid: Option<String>, 216 + } 217 + 218 + #[derive(Debug, Deserialize, Clone)] 219 + pub struct Ref { 220 + #[serde(rename = "$link")] 221 + pub link: String, 222 + } 223 + 224 + #[derive(Debug, Deserialize, Clone)] 225 + #[serde(rename_all = "camelCase")] 226 + pub struct Blob { 227 + #[serde(rename = "$type")] 228 + pub r#type: String, 229 + pub r#ref: Ref, 230 + pub mime_type: String, 231 + pub size: i32, 232 + } 233 + 234 + #[derive(Debug, Deserialize)] 235 + pub struct PinnedPost { 236 + pub cid: String, 237 + pub uri: String, 238 + } 239 + 240 + #[derive(Debug, Deserialize)] 241 + #[serde(rename_all = "camelCase")] 242 + pub struct Profile { 243 + #[serde(rename = "$type")] 244 + pub r#type: String, 245 + pub avatar: Option<Blob>, 246 + pub banner: Option<Blob>, 247 + pub created_at: Option<String>, 248 + pub pinned_post: Option<PinnedPost>, 249 + pub description: Option<String>, 250 + pub display_name: Option<String>, 251 + pub handle: Option<String>, 252 + } 253 + 254 + #[derive(Debug, Deserialize)] 255 + pub struct ProfileResponse { 256 + pub uri: String, 257 + pub cid: String, 258 + pub value: Profile, 259 + }
+4
crates/rockskyd/src/cmd/feed.rs
··· 4 4 rocksky_feed::run().await?; 5 5 Ok(()) 6 6 } 7 + 8 + pub async fn sync() -> Result<(), Error> { 9 + rocksky_feed::sync::sync_scrobbles().await 10 + }
+2
crates/rockskyd/src/main.rs
··· 40 40 Command::new("feed") 41 41 .about("Feed related commands") 42 42 .subcommand(Command::new("serve").about("Serve the Rocksky Feed API")) 43 + .subcommand(Command::new("sync").about("Sync scrobbles feed data to DuckDB")), 43 44 ) 44 45 } 45 46 ··· 99 100 } 100 101 Some(("feed", sub_m)) => match sub_m.subcommand() { 101 102 Some(("serve", _)) => cmd::feed::serve().await?, 103 + Some(("sync", _)) => cmd::feed::sync().await?, 102 104 _ => println!("Unknown feed command"), 103 105 }, 104 106 _ => {