A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: refactor feed handling to support RepoImpl and improve synchronization logic

+60 -38
+23 -29
crates/feed/src/feed.rs
··· 1 1 use crate::config::Config; 2 2 use crate::feed_handler::FeedHandler; 3 3 use crate::repo::duckdb::DuckdbRepo; 4 + use crate::repo::postgres::PostgresRepo; 4 5 use crate::repo::{Repo, RepoImpl}; 5 6 use crate::subscriber::ScrobbleSubscriber; 6 7 use crate::sync::sync_scrobbles; ··· 8 9 use anyhow::Error; 9 10 use atrium_api::app::bsky::feed::get_feed_skeleton::Parameters as FeedSkeletonQuery; 10 11 use atrium_api::app::bsky::feed::get_feed_skeleton::ParametersData as FeedSkeletonParameters; 11 - use sqlx::postgres::PgPoolOptions; 12 - use sqlx::{Pool, Postgres}; 13 12 use std::env; 14 13 use std::fmt::Debug; 15 14 use std::net::SocketAddr; 16 - use std::sync::Arc; 17 15 use warp::Filter; 18 16 19 17 /// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods. ··· 58 56 let feed_name = name.as_ref().to_string(); 59 57 60 58 async move { 59 + let ddb = DuckdbRepo::new().await?; 60 + let ddb = RepoImpl::Duckdb(ddb); 61 + ddb.clone().create_tables().await?; 62 + let ddb_clone = ddb.clone(); 63 + 61 64 let config = config; 62 - let pool = PgPoolOptions::new() 63 - .max_connections(5) 64 - .connect(&env::var("XATA_POSTGRES_URL")?) 65 - .await?; 66 - let pool = Arc::new(pool); 67 - let db_filter = warp::any().map(move || pool.clone()); 65 + let pg = RepoImpl::Postgres(PostgresRepo::new().await?); 66 + let pg_filter = warp::any().map(move || pg.clone()); 68 67 69 68 let did_config = config.clone(); 70 69 let did_json = warp::path(".well-known") ··· 75 74 let describe_feed_generator = warp::path("xrpc") 76 75 .and(warp::path("app.rocksky.feed.describeFeedGenerator")) 77 76 .and(warp::get()) 78 - .and(db_filter.clone()) 79 - .and_then(move |_pool: Arc<Pool<Postgres>>| { 80 - describe_feed_generator(feed_name.clone()) 81 - }); 77 + .and(pg_filter.clone()) 78 + .and_then(move |_repo: RepoImpl| describe_feed_generator(feed_name.clone())); 82 79 83 80 let get_feed_handler = handler.clone(); 84 81 let get_feed_skeleton = warp::path("xrpc") 85 82 .and(warp::path("app.rocksky.feed.getFeedSkeleton")) 86 83 .and(warp::get()) 87 84 .and(warp::query::<FeedSkeletonParameters>()) 88 - .and(db_filter.clone()) 89 - .and_then( 90 - move |query: FeedSkeletonParameters, _pool: Arc<Pool<Postgres>>| { 91 - get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone()) 92 - }, 93 - ); 85 + .and(pg_filter.clone()) 86 + .and_then(move |query: FeedSkeletonParameters, repo: RepoImpl| { 87 + get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone(), repo) 88 + }); 94 89 95 90 let api = did_json.or(describe_feed_generator).or(get_feed_skeleton); 96 91 ··· 120 115 ); 121 116 } 122 117 })); 123 - 124 - let ddb = DuckdbRepo::new().await?; 125 - let ddb = RepoImpl::Duckdb(ddb); 126 - ddb.clone().create_tables().await?; 127 - let ddb_clone = ddb.clone(); 128 118 129 119 let sync_feed = tokio::spawn(async move { 130 120 if !enable_sync { ··· 187 177 async fn get_feed_skeleton<Handler: FeedHandler>( 188 178 query: FeedSkeletonQuery, 189 179 handler: Handler, 180 + repo: RepoImpl, 190 181 ) -> Result<impl warp::Reply, warp::Rejection> { 191 182 let skeleton = handler 192 - .serve_feed(Request { 193 - cursor: query.cursor.clone(), 194 - feed: query.feed.clone(), 195 - limit: query.limit.map(u8::from), 196 - }) 183 + .serve_feed( 184 + repo, 185 + Request { 186 + cursor: query.cursor.clone(), 187 + feed: query.feed.clone(), 188 + limit: query.limit.map(u8::from), 189 + }, 190 + ) 197 191 .await; 198 192 199 193 Ok::<warp::reply::Json, warp::Rejection>(warp::reply::json(&FeedSkeleton {
+9 -2
crates/feed/src/feed_handler.rs
··· 1 - use crate::types::{FeedResult, Request, Scrobble, Uri}; 1 + use crate::{ 2 + repo::RepoImpl, 3 + types::{FeedResult, Request, Scrobble, Uri}, 4 + }; 2 5 3 6 /// A feed handler is responsible for 4 7 /// - Storing and managing firehose input. ··· 6 9 pub trait FeedHandler { 7 10 fn insert_scrobble(&self, scrobble: Scrobble) -> impl std::future::Future<Output = ()> + Send; 8 11 fn delete_scrobble(&self, uri: Uri) -> impl std::future::Future<Output = ()> + Send; 9 - fn serve_feed(&self, request: Request) -> impl std::future::Future<Output = FeedResult> + Send; 12 + fn serve_feed( 13 + &self, 14 + repo: RepoImpl, 15 + request: Request, 16 + ) -> impl std::future::Future<Output = FeedResult> + Send; 10 17 }
+2 -2
crates/feed/src/lib.rs
··· 7 7 use crate::{ 8 8 feed::Feed, 9 9 feed_handler::FeedHandler, 10 - repo::duckdb::DB_PATH, 10 + repo::{duckdb::DB_PATH, RepoImpl}, 11 11 types::{FeedResult, Scrobble}, 12 12 }; 13 13 ··· 56 56 todo!() 57 57 } 58 58 59 - async fn serve_feed(&self, _request: types::Request) -> FeedResult { 59 + async fn serve_feed(&self, _repo: RepoImpl, _request: types::Request) -> FeedResult { 60 60 FeedResult { 61 61 feed: vec![], 62 62 cursor: None,
+9 -2
crates/feed/src/repo/duckdb/mod.rs
··· 1 + use std::sync::Arc; 2 + use std::sync::Mutex; 3 + 1 4 use crate::{ 2 5 repo::duckdb::{ 3 6 album::save_album, artist::save_artist, scrobble::save_scrobble, track::save_track, ··· 22 25 #[derive(Clone)] 23 26 pub struct DuckdbRepo { 24 27 pool: r2d2::Pool<DuckDBConnectionManager>, 28 + mutex: Arc<Mutex<()>>, 25 29 } 26 30 27 31 impl DuckdbRepo { 28 32 pub async fn new() -> Result<Self, Error> { 29 33 let manager = DuckDBConnectionManager::file(DB_PATH); 30 34 let pool = r2d2::Pool::builder().build(manager)?; 31 - Ok(Self { pool }) 35 + Ok(Self { 36 + pool, 37 + mutex: Arc::new(Mutex::new(())), 38 + }) 32 39 } 33 40 } 34 41 ··· 48 55 uri: &str, 49 56 record: ScrobbleRecord, 50 57 ) -> Result<(), anyhow::Error> { 51 - save_scrobble(self.pool.clone(), did, uri, record).await 58 + save_scrobble(self.pool.clone(), self.mutex.clone(), did, uri, record).await 52 59 } 53 60 54 61 async fn insert_track(self, uri: &str, record: SongRecord) -> Result<(), anyhow::Error> {
+5
crates/feed/src/repo/duckdb/scrobble.rs
··· 1 + use std::sync::Arc; 2 + 1 3 use anyhow::Error; 2 4 use duckdb::{params, OptionalExt}; 5 + use std::sync::Mutex; 3 6 4 7 use crate::{did::did_to_profile, r2d2_duckdb::DuckDBConnectionManager, types::ScrobbleRecord}; 5 8 6 9 pub async fn save_scrobble( 7 10 pool: r2d2::Pool<DuckDBConnectionManager>, 11 + mutex: Arc<Mutex<()>>, 8 12 did: &str, 9 13 uri: &str, 10 14 record: ScrobbleRecord, ··· 15 19 let uri = uri.to_string(); 16 20 17 21 tokio::task::spawn_blocking(move || -> Result<(), Error> { 22 + let _lock = mutex.lock().unwrap(); 18 23 let mut conn = pool.get()?; 19 24 let tx = conn.transaction()?; 20 25 let mut user = tx.prepare("SELECT id FROM users WHERE did = ?")?;
+12 -3
crates/feed/src/sync.rs
··· 27 27 let total_scrobbles = total_scrobbles.0; 28 28 tracing::info!(total = %total_scrobbles.magenta(), "Total scrobbles to sync"); 29 29 30 - for offset in (0..total_scrobbles).step_by(BATCH_SIZE as usize) { 30 + let start = env::var("SYNC_START_OFFSET") 31 + .ok() 32 + .and_then(|s| s.parse::<i64>().ok()) 33 + .unwrap_or(0); 34 + 35 + for offset in (start..total_scrobbles).step_by(BATCH_SIZE as usize) { 31 36 tracing::info!( 32 37 offset = %(offset).magenta(), 33 38 end = %(offset + BATCH_SIZE).magenta(), ··· 77 82 Ok::<(), Error>(()) 78 83 }); 79 84 80 - let mut i = 1; 85 + let mut i = env::var("SYNC_START_OFFSET") 86 + .ok() 87 + .and_then(|s| s.parse::<i64>().ok()) 88 + .unwrap_or(0) 89 + + 1; 81 90 82 91 let pool = PgPoolOptions::new() 83 92 .max_connections(5) ··· 140 149 repo.insert_scrobble(&did, &scrobble_uri, record).await?; 141 150 142 151 // sleep a bit to avoid overwhelming the database 143 - tokio::time::sleep(std::time::Duration::from_millis(600)).await; 152 + tokio::time::sleep(std::time::Duration::from_millis(300)).await; 144 153 145 154 i += 1; 146 155 }