A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
98
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: improve error handling and logging in scrobble saving and synchronization

+33 -22
+10 -4
crates/feed/src/repo/duckdb/scrobble.rs
··· 18 18 19 19 let uri = uri.to_string(); 20 20 21 - tokio::task::spawn_blocking(move || -> Result<(), Error> { 21 + let handle = tokio::task::spawn_blocking(move || -> Result<(), Error> { 22 + tracing::info!("Inserting scrobble for user: {}, scrobble: {}", did, uri); 22 23 let _lock = mutex.lock().unwrap(); 23 24 let mut conn = pool.get()?; 24 25 let tx = conn.transaction()?; ··· 122 123 .as_ref() 123 124 .map(|tags| tags 124 125 .iter() 126 + .map(|tag| tag.replace('\'', "''")) 125 127 .map(|tag| format!("'{}'", tag)) 126 128 .collect::<Vec<_>>() 127 129 .join(", ")) ··· 341 343 Err(e) => tracing::error!(error = %e, "Error inserting scrobble"), 342 344 } 343 345 344 - tx.commit()?; 346 + match tx.commit() { 347 + Ok(_) => tracing::info!("Transaction committed successfully"), 348 + Err(e) => tracing::error!(error = %e, "Error committing transaction"), 349 + } 345 350 346 351 Ok::<(), Error>(()) 347 - }) 348 - .await??; 352 + }); 353 + 354 + handle.await??; 349 355 350 356 Ok(()) 351 357 }
+23 -18
crates/feed/src/sync.rs
··· 11 11 pub async fn sync_scrobbles(ddb: RepoImpl) -> Result<(), Error> { 12 12 tracing::info!("Starting scrobble synchronization..."); 13 13 14 + let pool = PgPoolOptions::new() 15 + .max_connections(10) 16 + .connect(&env::var("XATA_POSTGRES_URL")?) 17 + .await?; 18 + 14 19 let (tx, mut rx) = tokio::sync::mpsc::channel::<PgRow>(100); 15 20 21 + let pool_clone = pool.clone(); 16 22 let handle = tokio::spawn(async move { 17 - let pool = PgPoolOptions::new() 18 - .max_connections(5) 19 - .connect(&env::var("XATA_POSTGRES_URL")?) 20 - .await?; 21 - 22 23 const BATCH_SIZE: i64 = 1000; 23 24 24 - let total_scrobbles: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrobbles") 25 - .fetch_one(&pool) 26 - .await?; 25 + let total_scrobbles: (i64,) = 26 + sqlx::query_as("SELECT COUNT(*) FROM scrobbles WHERE uri IS NOT NULL") 27 + .fetch_one(&pool_clone) 28 + .await?; 27 29 let total_scrobbles = total_scrobbles.0; 28 30 tracing::info!(total = %total_scrobbles.magenta(), "Total scrobbles to sync"); 29 31 ··· 65 67 LEFT JOIN artists ar ON s.artist_id = ar.xata_id 66 68 LEFT JOIN tracks t ON s.track_id = t.xata_id 67 69 LEFT JOIN users u ON s.user_id = u.xata_id 70 + WHERE s.uri IS NOT NULL 68 71 ORDER BY s.timestamp DESC 69 72 LIMIT $1 OFFSET $2 70 73 "#, 71 74 ) 72 75 .bind(BATCH_SIZE) 73 76 .bind(offset as i64) 74 - .fetch_all(&pool) 77 + .fetch_all(&pool_clone) 75 78 .await?; 76 79 77 80 for row in result { ··· 88 91 .unwrap_or(0) 89 92 + 1; 90 93 91 - let pool = PgPoolOptions::new() 92 - .max_connections(5) 93 - .connect(&env::var("XATA_POSTGRES_URL")?) 94 - .await?; 95 - 96 - let total_scrobbles: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scrobbles") 97 - .fetch_one(&pool) 98 - .await?; 94 + let total_scrobbles: (i64,) = 95 + sqlx::query_as("SELECT COUNT(*) FROM scrobbles WHERE uri IS NOT NULL") 96 + .fetch_one(&pool) 97 + .await?; 99 98 let total_scrobbles = total_scrobbles.0; 100 99 101 100 let repo = ddb.clone(); ··· 108 107 let scrobble_uri = row.get::<Option<String>, _>("uri"); 109 108 if scrobble_uri.is_none() { 110 109 tracing::warn!(count = %i.magenta(), "Skipping scrobble with no URI"); 110 + i += 1; 111 111 continue; 112 112 } 113 113 let scrobble_uri = scrobble_uri.unwrap(); ··· 146 146 }; 147 147 148 148 let repo = ddb.clone(); 149 - repo.insert_scrobble(&did, &scrobble_uri, record).await?; 149 + match repo.insert_scrobble(&did, &scrobble_uri, record).await { 150 + Ok(_) => tracing::info!(count = %i.magenta(), "Scrobble inserted successfully"), 151 + Err(e) => { 152 + tracing::error!(error = %e, "Error inserting scrobble"); 153 + } 154 + } 150 155 151 156 i += 1; 152 157 }