A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: integrate r2d2 for DuckDB connection management and enable sync functionality in feed operations

+243 -152
+21
Cargo.lock
··· 4940 4940 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 4941 4941 4942 4942 [[package]] 4943 + name = "r2d2" 4944 + version = "0.8.10" 4945 + source = "registry+https://github.com/rust-lang/crates.io-index" 4946 + checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" 4947 + dependencies = [ 4948 + "log", 4949 + "parking_lot", 4950 + "scheduled-thread-pool", 4951 + ] 4952 + 4953 + [[package]] 4943 4954 name = "radium" 4944 4955 version = "0.7.0" 4945 4956 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5369 5380 "dotenv", 5370 5381 "duckdb", 5371 5382 "owo-colors", 5383 + "r2d2", 5372 5384 "reqwest", 5373 5385 "serde", 5374 5386 "serde_json", ··· 5942 5954 checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" 5943 5955 dependencies = [ 5944 5956 "windows-sys 0.61.1", 5957 + ] 5958 + 5959 + [[package]] 5960 + name = "scheduled-thread-pool" 5961 + version = "0.2.7" 5962 + source = "registry+https://github.com/rust-lang/crates.io-index" 5963 + checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" 5964 + dependencies = [ 5965 + "parking_lot", 5945 5966 ] 5946 5967 5947 5968 [[package]]
+1
crates/feed/Cargo.toml
··· 41 41 async-trait = "0.1.89" 42 42 xid = "1.1.1" 43 43 sha256 = "1.6.0" 44 + r2d2 = "0.8.10"
+21 -3
crates/feed/src/feed.rs
··· 1 1 use crate::config::Config; 2 2 use crate::feed_handler::FeedHandler; 3 + use crate::repo::duckdb::DuckdbRepo; 4 + use crate::repo::{Repo, RepoImpl}; 3 5 use crate::subscriber::ScrobbleSubscriber; 6 + use crate::sync::sync_scrobbles; 4 7 use crate::types::{DidDocument, FeedSkeleton, Request, Service, SkeletonFeedScrobbleData}; 5 8 use anyhow::Error; 6 9 use atrium_api::app::bsky::feed::get_feed_skeleton::Parameters as FeedSkeletonQuery; ··· 29 32 &mut self, 30 33 name: impl AsRef<str>, 31 34 address: impl Into<SocketAddr> + Debug + Clone + Send, 35 + enable_sync: bool, 32 36 ) -> impl std::future::Future<Output = Result<(), Error>> + Send { 33 - self.start_with_config(name, Config::load_env_config(), address) 37 + self.start_with_config(name, Config::load_env_config(), address, enable_sync) 34 38 } 35 39 36 40 /// Starts the feed generator server & connects to the firehose. ··· 47 51 name: impl AsRef<str>, 48 52 config: Config, 49 53 address: impl Into<SocketAddr> + Debug + Clone + Send, 54 + enable_sync: bool, 50 55 ) -> impl std::future::Future<Output = Result<(), Error>> + Send { 51 56 let handler = self.handler(); 52 57 let address = address.clone(); ··· 115 120 ); 116 121 } 117 122 })); 123 + 124 + let ddb = DuckdbRepo::new().await?; 125 + let ddb = RepoImpl::Duckdb(ddb); 126 + ddb.clone().create_tables().await?; 127 + let ddb_clone = ddb.clone(); 128 + 129 + let sync_feed = tokio::spawn(async move { 130 + if !enable_sync { 131 + return Ok::<(), Error>(()); 132 + } 133 + sync_scrobbles(Some(ddb_clone)).await?; 134 + Ok::<(), Error>(()) 135 + }); 118 136 let feed_server = warp::serve(routes); 119 137 let firehose_listener = tokio::spawn(async move { 120 138 let jetstream_server = env::var("JETSTREAM_SERVER") ··· 125 143 ); 126 144 let subscriber = ScrobbleSubscriber::new(&url); 127 145 128 - match subscriber.run().await { 146 + match subscriber.run(ddb).await { 129 147 Ok(_) => tracing::info!("Firehose listener exited normally"), 130 148 Err(e) => tracing::error!(error = %e, "Firehose listener exited with error"), 131 149 } ··· 133 151 Ok::<(), Error>(()) 134 152 }); 135 153 136 - tokio::join!(feed_server.run(address), firehose_listener) 154 + tokio::join!(feed_server.run(address), firehose_listener, sync_feed) 137 155 .1 138 156 .expect("Couldn't await tasks")?; 139 157
+3 -2
crates/feed/src/lib.rs
··· 16 16 pub mod feed; 17 17 pub mod feed_handler; 18 18 pub mod feeds; 19 + mod r2d2_duckdb; 19 20 pub mod repo; 20 21 pub mod subscriber; 21 22 pub mod sync; ··· 63 64 } 64 65 } 65 66 66 - pub async fn run() -> Result<(), Error> { 67 + pub async fn run(enable_sync: bool) -> Result<(), Error> { 67 68 let conn = Connection::open(DB_PATH)?; 68 69 let pool = PgPoolOptions::new() 69 70 .max_connections(5) ··· 81 82 let addr_str = format!("{}:{}", host, port); 82 83 let addr: SocketAddr = addr_str.parse().expect("Invalid address format"); 83 84 84 - feed.start("RecentlyPlayed", addr).await?; 85 + feed.start("RecentlyPlayed", addr, enable_sync).await?; 85 86 Ok(()) 86 87 }
+46
crates/feed/src/r2d2_duckdb.rs
··· 1 + extern crate duckdb; 2 + extern crate r2d2; 3 + 4 + use duckdb::{params, Connection, Error}; 5 + use std::path::{Path, PathBuf}; 6 + 7 + enum ConnectionConfig { 8 + File(PathBuf), 9 + Memory, 10 + } 11 + 12 + /// An `r2d2::ManageConnection` for `ruDuckDB::Connection`s. 13 + pub struct DuckDBConnectionManager(ConnectionConfig); 14 + 15 + impl DuckDBConnectionManager { 16 + /// Creates a new `DuckDBConnectionManager` from file. 17 + /// 18 + pub fn file<P: AsRef<Path>>(path: P) -> Self { 19 + DuckDBConnectionManager(ConnectionConfig::File(path.as_ref().to_path_buf())) 20 + } 21 + 22 + pub fn memory() -> Self { 23 + DuckDBConnectionManager(ConnectionConfig::Memory) 24 + } 25 + } 26 + 27 + impl r2d2::ManageConnection for DuckDBConnectionManager { 28 + type Connection = Connection; 29 + type Error = duckdb::Error; 30 + 31 + fn connect(&self) -> Result<Connection, Error> { 32 + match self.0 { 33 + ConnectionConfig::File(ref path) => Connection::open(path), 34 + ConnectionConfig::Memory => Connection::open_in_memory(), 35 + } 36 + } 37 + 38 + fn is_valid(&self, conn: &mut Connection) -> Result<(), Error> { 39 + let _ = conn.execute("", params![]); 40 + Ok(()) 41 + } 42 + 43 + fn has_broken(&self, _: &mut Connection) -> bool { 44 + false 45 + } 46 + }
+25 -28
crates/feed/src/repo/duckdb/album.rs
··· 1 - use crate::{repo::duckdb::DB_PATH, types::AlbumRecord}; 1 + use crate::r2d2_duckdb::DuckDBConnectionManager; 2 + use crate::types::AlbumRecord; 2 3 use anyhow::Error; 3 4 use duckdb::params; 4 5 5 - pub async fn save_album(uri: &str, record: AlbumRecord) -> Result<(), anyhow::Error> { 6 + pub async fn save_album( 7 + pool: r2d2::Pool<DuckDBConnectionManager>, 8 + uri: &str, 9 + record: AlbumRecord, 10 + ) -> Result<(), Error> { 6 11 let uri = uri.to_string(); 7 - tokio::task::spawn_blocking(move || -> Result<(), Error> { 8 - let conn = duckdb::Connection::open(DB_PATH)?; 12 + let conn = pool.get()?; 9 13 10 - let album_hash = 11 - sha256::digest(format!("{} - {}", record.title, record.artist).to_lowercase()); 14 + let album_hash = sha256::digest(format!("{} - {}", record.title, record.artist).to_lowercase()); 12 15 13 - match conn.execute( 14 - "INSERT INTO albums ( 16 + match conn.execute( 17 + "INSERT INTO albums ( 15 18 id, 16 19 title, 17 20 artist, ··· 36 39 release_date = EXCLUDED.release_date, 37 40 artist = EXCLUDED.artist, 38 41 title = EXCLUDED.title;", 39 - params![ 40 - xid::new().to_string(), 41 - record.title, 42 - record.artist, 43 - record.release_date, 44 - record.album_art_url, 45 - record.year, 46 - album_hash, 47 - uri 48 - ], 49 - ) { 50 - Ok(x) => tracing::info!("Album successfully inserted or updated: {}", x), 51 - Err(e) => tracing::error!(error = %e, "Error inserting/updating album"), 52 - } 53 - 54 - conn.close() 55 - .map_err(|(_, e)| Error::msg(format!("Error closing connection: {}", e)))?; 56 - Ok(()) 57 - }) 58 - .await??; 42 + params![ 43 + xid::new().to_string(), 44 + record.title, 45 + record.artist, 46 + record.release_date, 47 + record.album_art_url, 48 + record.year, 49 + album_hash, 50 + uri 51 + ], 52 + ) { 53 + Ok(x) => tracing::info!("Album successfully inserted or updated: {}", x), 54 + Err(e) => tracing::error!(error = %e, "Error inserting/updating album"), 55 + } 59 56 60 57 Ok(()) 61 58 }
+32 -35
crates/feed/src/repo/duckdb/artist.rs
··· 1 - use crate::{repo::duckdb::DB_PATH, types::ArtistRecord}; 1 + use crate::{r2d2_duckdb::DuckDBConnectionManager, types::ArtistRecord}; 2 2 use anyhow::Error; 3 3 use duckdb::params; 4 4 5 - pub async fn save_artist(uri: &str, record: ArtistRecord) -> Result<(), anyhow::Error> { 5 + pub async fn save_artist( 6 + pool: r2d2::Pool<DuckDBConnectionManager>, 7 + uri: &str, 8 + record: ArtistRecord, 9 + ) -> Result<(), Error> { 6 10 let uri = uri.to_string(); 7 - tokio::task::spawn_blocking(move || -> Result<(), Error> { 8 - let conn = duckdb::Connection::open(DB_PATH)?; 11 + let conn = pool.get()?; 9 12 10 - let artist_hash = sha256::digest(record.name.to_lowercase()); 11 - match conn.execute( 12 - &format!( 13 - "INSERT INTO artists ( 13 + let artist_hash = sha256::digest(record.name.to_lowercase()); 14 + match conn.execute( 15 + &format!( 16 + "INSERT INTO artists ( 14 17 id, 15 18 name, 16 19 picture, ··· 27 30 ) ON CONFLICT (sha256) DO UPDATE SET 28 31 uri = EXCLUDED.uri, 29 32 tags = EXCLUDED.tags;", 30 - record 31 - .tags 32 - .as_ref() 33 - .map(|tags| tags 34 - .iter() 35 - .map(|tag| format!("'{}'", tag)) 36 - .collect::<Vec<_>>() 37 - .join(", ")) 38 - .unwrap_or_default() 39 - ), 40 - params![ 41 - xid::new().to_string(), 42 - record.name, 43 - record.picture_url, 44 - artist_hash, 45 - uri 46 - ], 47 - ) { 48 - Ok(x) => tracing::info!("Artist successfully inserted or updated: {}", x), 49 - Err(e) => tracing::error!(error = %e, "Error inserting/updating artist"), 50 - } 51 - 52 - conn.close() 53 - .map_err(|(_, e)| Error::msg(format!("Error closing connection: {}", e)))?; 54 - Ok(()) 55 - }) 56 - .await??; 33 + record 34 + .tags 35 + .as_ref() 36 + .map(|tags| tags 37 + .iter() 38 + .map(|tag| format!("'{}'", tag)) 39 + .collect::<Vec<_>>() 40 + .join(", ")) 41 + .unwrap_or_default() 42 + ), 43 + params![ 44 + xid::new().to_string(), 45 + record.name, 46 + record.picture_url, 47 + artist_hash, 48 + uri 49 + ], 50 + ) { 51 + Ok(x) => tracing::info!("Artist successfully inserted or updated: {}", x), 52 + Err(e) => tracing::error!(error = %e, "Error inserting/updating artist"), 53 + } 57 54 58 55 Ok(()) 59 56 }
+11 -13
crates/feed/src/repo/duckdb/mod.rs
··· 1 - use std::sync::Arc; 2 - 3 1 use crate::{ 4 2 repo::duckdb::{ 5 3 album::save_album, artist::save_artist, scrobble::save_scrobble, track::save_track, ··· 9 7 }; 10 8 11 9 use super::Repo; 10 + use crate::r2d2_duckdb::DuckDBConnectionManager; 12 11 use anyhow::Error; 13 12 use async_trait::async_trait; 14 - use tokio::sync::Mutex; 15 13 16 14 pub mod album; 17 15 pub mod artist; ··· 23 21 24 22 #[derive(Clone)] 25 23 pub struct DuckdbRepo { 26 - pub conn: Arc<Mutex<duckdb::Connection>>, 24 + pool: r2d2::Pool<DuckDBConnectionManager>, 27 25 } 28 26 29 27 impl DuckdbRepo { 30 28 pub async fn new() -> Result<Self, Error> { 31 - let conn = duckdb::Connection::open(DB_PATH)?; 32 - let conn = Arc::new(Mutex::new(conn)); 33 - Ok(Self { conn: conn.clone() }) 29 + let manager = DuckDBConnectionManager::file(DB_PATH); 30 + let pool = r2d2::Pool::builder().build(manager)?; 31 + Ok(Self { pool }) 34 32 } 35 33 } 36 34 37 35 #[async_trait] 38 36 impl Repo for DuckdbRepo { 39 37 async fn insert_album(self, uri: &str, record: AlbumRecord) -> Result<(), anyhow::Error> { 40 - save_album(uri, record).await 38 + save_album(self.pool.clone(), uri, record).await 41 39 } 42 40 43 41 async fn insert_artist(self, uri: &str, record: ArtistRecord) -> Result<(), anyhow::Error> { 44 - save_artist(uri, record).await 42 + save_artist(self.pool.clone(), uri, record).await 45 43 } 46 44 47 45 async fn insert_scrobble( ··· 50 48 uri: &str, 51 49 record: ScrobbleRecord, 52 50 ) -> Result<(), anyhow::Error> { 53 - save_scrobble(did, uri, record).await 51 + save_scrobble(self.pool.clone(), did, uri, record).await 54 52 } 55 53 56 54 async fn insert_track(self, uri: &str, record: SongRecord) -> Result<(), anyhow::Error> { 57 - save_track(uri, record).await 55 + save_track(self.pool.clone(), uri, record).await 58 56 } 59 57 60 58 async fn insert_user(self, did: &str) -> Result<(), anyhow::Error> { 61 - save_user(did).await 59 + save_user(self.pool.clone(), did).await 62 60 } 63 61 64 62 async fn get_albums(self) -> Result<(), anyhow::Error> { ··· 98 96 } 99 97 100 98 async fn create_tables(self) -> Result<(), anyhow::Error> { 101 - let conn = self.conn.lock().await; 99 + let conn = self.pool.get()?; 102 100 conn.execute_batch( 103 101 "BEGIN; 104 102 CREATE TABLE IF NOT EXISTS artists (
+5 -7
crates/feed/src/repo/duckdb/scrobble.rs
··· 1 1 use anyhow::Error; 2 2 use duckdb::{params, OptionalExt}; 3 3 4 - use crate::{did::did_to_profile, repo::duckdb::DB_PATH, types::ScrobbleRecord}; 4 + use crate::{did::did_to_profile, r2d2_duckdb::DuckDBConnectionManager, types::ScrobbleRecord}; 5 5 6 6 pub async fn save_scrobble( 7 + pool: r2d2::Pool<DuckDBConnectionManager>, 7 8 did: &str, 8 9 uri: &str, 9 10 record: ScrobbleRecord, 10 - ) -> Result<(), anyhow::Error> { 11 + ) -> Result<(), Error> { 11 12 let did = did.to_string(); 12 13 let cloned_did = did.clone(); 13 14 14 15 let uri = uri.to_string(); 15 16 16 17 tokio::task::spawn_blocking(move || -> Result<(), Error> { 17 - let mut conn = duckdb::Connection::open(DB_PATH)?; 18 + let mut conn = pool.get()?; 18 19 let tx = conn.transaction()?; 19 20 let mut user = tx.prepare("SELECT id FROM users WHERE did = ?")?; 20 21 let user_id: Option<String> = user.query_row(params![did], |row| row.get(0)).optional()?; ··· 337 338 338 339 tx.commit()?; 339 340 340 - conn.close() 341 - .map_err(|(_, e)| Error::msg(format!("Error closing connection: {}", e)))?; 342 - 343 - Ok(()) 341 + Ok::<(), Error>(()) 344 342 }) 345 343 .await??; 346 344
+38 -41
crates/feed/src/repo/duckdb/track.rs
··· 1 - use crate::{repo::duckdb::DB_PATH, types::SongRecord}; 1 + use crate::{r2d2_duckdb::DuckDBConnectionManager, types::SongRecord}; 2 2 use anyhow::Error; 3 3 use duckdb::params; 4 4 5 - pub async fn save_track(uri: &str, record: SongRecord) -> Result<(), anyhow::Error> { 5 + pub async fn save_track( 6 + pool: r2d2::Pool<DuckDBConnectionManager>, 7 + uri: &str, 8 + record: SongRecord, 9 + ) -> Result<(), Error> { 6 10 let uri = uri.to_string(); 7 - tokio::task::spawn_blocking(move || -> Result<(), Error> { 8 - let conn = duckdb::Connection::open(DB_PATH)?; 9 - let track_hash = sha256::digest( 10 - format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(), 11 - ); 11 + let conn = pool.get()?; 12 + let track_hash = sha256::digest( 13 + format!("{} - {} - {}", record.title, record.artist, record.album).to_lowercase(), 14 + ); 12 15 13 - match conn.execute( 14 - "INSERT INTO tracks ( 16 + match conn.execute( 17 + "INSERT INTO tracks ( 15 18 id, 16 19 title, 17 20 artist, ··· 56 59 ) ON CONFLICT (sha256) DO UPDATE SET 57 60 uri = EXCLUDED.uri; 58 61 ", 59 - params![ 60 - xid::new().to_string(), 61 - record.title, 62 - record.artist, 63 - record.album_artist, 64 - record.album_art_url, 65 - record.album, 66 - record.track_number, 67 - record.disc_number, 68 - record.spotify_link, 69 - record.tidal_link, 70 - record.youtube_link, 71 - record.apple_music_link, 72 - record.copyright_message, 73 - record.label, 74 - record.lyrics, 75 - record.composer, 76 - record.duration, 77 - record.mbid, 78 - track_hash, 79 - uri 80 - ], 81 - ) { 82 - Ok(x) => tracing::info!("Track successfully inserted or updated: {}", x), 83 - Err(e) => tracing::error!(error = %e, "Error inserting/updating track"), 84 - } 85 - 86 - conn.close() 87 - .map_err(|(_, e)| Error::msg(format!("Error closing connection: {}", e)))?; 88 - Ok(()) 89 - }) 90 - .await??; 62 + params![ 63 + xid::new().to_string(), 64 + record.title, 65 + record.artist, 66 + record.album_artist, 67 + record.album_art_url, 68 + record.album, 69 + record.track_number, 70 + record.disc_number, 71 + record.spotify_link, 72 + record.tidal_link, 73 + record.youtube_link, 74 + record.apple_music_link, 75 + record.copyright_message, 76 + record.label, 77 + record.lyrics, 78 + record.composer, 79 + record.duration, 80 + record.mbid, 81 + track_hash, 82 + uri 83 + ], 84 + ) { 85 + Ok(x) => tracing::info!("Track successfully inserted or updated: {}", x), 86 + Err(e) => tracing::error!(error = %e, "Error inserting/updating track"), 87 + } 91 88 92 89 Ok(()) 93 90 }
+6 -1
crates/feed/src/repo/duckdb/user.rs
··· 1 - pub async fn save_user(_did: &str) -> Result<(), anyhow::Error> { 1 + use crate::r2d2_duckdb::DuckDBConnectionManager; 2 + 3 + pub async fn save_user( 4 + _pool: r2d2::Pool<DuckDBConnectionManager>, 5 + _did: &str, 6 + ) -> Result<(), anyhow::Error> { 2 7 todo!() 3 8 }
+11 -14
crates/feed/src/subscriber.rs
··· 5 5 use tungstenite::Message; 6 6 7 7 use crate::{ 8 - repo::{duckdb::DuckdbRepo, Repo, RepoImpl}, 8 + repo::{Repo, RepoImpl}, 9 9 types::{AlbumRecord, ArtistRecord, Commit, Root, ScrobbleRecord, SongRecord}, 10 10 ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID, 11 11 }; ··· 21 21 } 22 22 } 23 23 24 - pub async fn run(&self) -> Result<(), Error> { 24 + pub async fn run(&self, repo: RepoImpl) -> Result<(), Error> { 25 25 let (mut ws_stream, _) = connect_async(&self.service_url).await?; 26 26 tracing::info!(url = %self.service_url.bright_green(), "Connected to jetstream at"); 27 - let ddb = DuckdbRepo::new().await?; 28 - ddb.clone().create_tables().await?; 29 - let ddb = RepoImpl::Duckdb(ddb); 30 27 31 28 while let Some(msg) = ws_stream.next().await { 32 29 match msg { 33 30 Ok(msg) => { 34 - if let Err(e) = handle_message(&ddb, msg) { 31 + if let Err(e) = handle_message(&repo, msg) { 35 32 tracing::error!(error = %e, "Error handling message"); 36 33 } 37 34 } ··· 46 43 } 47 44 } 48 45 49 - fn handle_message(ddb: &RepoImpl, msg: Message) -> Result<(), Error> { 50 - let ddb = ddb.clone(); 46 + fn handle_message(repo: &RepoImpl, msg: Message) -> Result<(), Error> { 47 + let repo = repo.clone(); 51 48 tokio::spawn(async move { 52 49 if let Message::Text(text) = msg { 53 50 let message: Root = serde_json::from_str(&text)?; ··· 60 57 61 58 if let Some(commit) = message.commit { 62 59 match commit.operation.as_str() { 63 - "create" => save_scrobble(ddb, &message.did, commit).await?, 60 + "create" => save_scrobble(repo, &message.did, commit).await?, 64 61 _ => tracing::warn!(operation = %commit.operation, "Unknown operation"), 65 62 } 66 63 } ··· 70 67 Ok(()) 71 68 } 72 69 73 - async fn save_scrobble(ddb: RepoImpl, did: &str, commit: Commit) -> Result<(), Error> { 70 + async fn save_scrobble(repo: RepoImpl, did: &str, commit: Commit) -> Result<(), Error> { 74 71 if !vec![SCROBBLE_NSID, ARTIST_NSID, ALBUM_NSID, SONG_NSID] 75 72 .contains(&commit.collection.as_str()) 76 73 { ··· 81 78 SCROBBLE_NSID => { 82 79 let record = serde_json::from_value::<ScrobbleRecord>(commit.record)?; 83 80 let uri = format!("at://{}/app.rocksky.scrobble/{}", did, commit.rkey); 84 - ddb.insert_scrobble(did, &uri, record.clone()).await?; 81 + repo.insert_scrobble(did, &uri, record.clone()).await?; 85 82 } 86 83 ARTIST_NSID => { 87 84 let record = serde_json::from_value::<ArtistRecord>(commit.record)?; 88 85 let uri = format!("at://{}/app.rocksky.artist/{}", did, commit.rkey); 89 - ddb.insert_artist(&uri, record).await?; 86 + repo.insert_artist(&uri, record).await?; 90 87 } 91 88 ALBUM_NSID => { 92 89 let record = serde_json::from_value::<AlbumRecord>(commit.record)?; 93 90 let uri = format!("at://{}/app.rocksky.album/{}", did, commit.rkey); 94 - ddb.insert_album(&uri, record).await?; 91 + repo.insert_album(&uri, record).await?; 95 92 } 96 93 SONG_NSID => { 97 94 let record = serde_json::from_value::<SongRecord>(commit.record)?; 98 95 let uri = format!("at://{}/app.rocksky.song/{}", did, commit.rkey); 99 - ddb.insert_track(&uri, record).await?; 96 + repo.insert_track(&uri, record).await?; 100 97 } 101 98 _ => { 102 99 tracing::warn!(collection = %commit.collection, "Unknown collection");
+10 -2
crates/feed/src/sync.rs
··· 8 8 use crate::repo::{duckdb::DuckdbRepo, Repo, RepoImpl}; 9 9 use crate::types::ScrobbleRecord; 10 10 11 - pub async fn sync_scrobbles() -> Result<(), Error> { 11 + pub async fn sync_scrobbles(ddb: Option<RepoImpl>) -> Result<(), Error> { 12 12 tracing::info!("Starting scrobble synchronization..."); 13 13 14 14 let (tx, mut rx) = tokio::sync::mpsc::channel::<PgRow>(500); ··· 60 60 LEFT JOIN artists ar ON s.artist_id = ar.xata_id 61 61 LEFT JOIN tracks t ON s.track_id = t.xata_id 62 62 LEFT JOIN users u ON s.user_id = u.xata_id 63 + ORDER BY s.timestamp DESC 63 64 LIMIT $1 OFFSET $2 64 65 "#, 65 66 ) ··· 135 136 song_uri: row.get::<Option<String>, _>("track_uri"), 136 137 }; 137 138 138 - let repo = DuckdbRepo::new().await?; 139 + let repo = match ddb { 140 + Some(RepoImpl::Duckdb(_)) => RepoImpl::Duckdb(DuckdbRepo::new().await?), 141 + Some(RepoImpl::Postgres(_)) => { 142 + unimplemented!("Postgres repo not implemented yet"); 143 + } 144 + None => RepoImpl::Duckdb(DuckdbRepo::new().await?), 145 + }; 146 + 139 147 repo.insert_scrobble(&did, &scrobble_uri, record).await?; 140 148 141 149 i += 1;
+3 -3
crates/rockskyd/src/cmd/feed.rs
··· 1 1 use anyhow::Error; 2 2 3 - pub async fn serve() -> Result<(), Error> { 4 - rocksky_feed::run().await?; 3 + pub async fn serve(enable_sync: bool) -> Result<(), Error> { 4 + rocksky_feed::run(enable_sync).await?; 5 5 Ok(()) 6 6 } 7 7 8 8 pub async fn sync() -> Result<(), Error> { 9 - rocksky_feed::sync::sync_scrobbles().await 9 + rocksky_feed::sync::sync_scrobbles(None).await 10 10 }
+10 -3
crates/rockskyd/src/main.rs
··· 1 - use clap::Command; 1 + use clap::{arg, Command}; 2 2 use dotenv::dotenv; 3 3 use tracing_subscriber::fmt::format::Format; 4 4 ··· 39 39 .subcommand( 40 40 Command::new("feed") 41 41 .about("Feed related commands") 42 - .subcommand(Command::new("serve").about("Serve the Rocksky Feed API")) 42 + .subcommand( 43 + Command::new("serve") 44 + .arg(arg!(--sync "Enable sync mode").required(false)) 45 + .about("Serve the Rocksky Feed API"), 46 + ) 43 47 .subcommand(Command::new("sync").about("Sync scrobbles feed data to DuckDB")), 44 48 ) 45 49 } ··· 99 103 cmd::pull::pull_data().await?; 100 104 } 101 105 Some(("feed", sub_m)) => match sub_m.subcommand() { 102 - Some(("serve", _)) => cmd::feed::serve().await?, 106 + Some(("serve", args)) => { 107 + let enable_sync = args.get_flag("sync"); 108 + cmd::feed::serve(enable_sync).await? 109 + } 103 110 Some(("sync", _)) => cmd::feed::sync().await?, 104 111 _ => println!("Unknown feed command"), 105 112 },