A decentralized music tracking and discovery platform built on AT Protocol 🎵
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: remove main.rs file and associated Spotify user thread management logic

-235
-235
crates/spotify/src/main.rs
··· 1 - use std::{ 2 - collections::HashMap, 3 - env, 4 - sync::{atomic::AtomicBool, Arc, Mutex}, 5 - thread, 6 - }; 7 - 8 - use anyhow::Error; 9 - use async_nats::connect; 10 - use dotenv::dotenv; 11 - use owo_colors::OwoColorize; 12 - use rocksky_spotify::cache::Cache; 13 - use rocksky_spotify::{find_spotify_user, find_spotify_users, watch_currently_playing}; 14 - use sqlx::postgres::PgPoolOptions; 15 - use tokio_stream::StreamExt; 16 - 17 - #[tokio::main] 18 - async fn main() -> Result<(), Box<dyn std::error::Error>> { 19 - dotenv().ok(); 20 - let cache = Cache::new()?; 21 - let pool = PgPoolOptions::new() 22 - .max_connections(5) 23 - .connect(&env::var("XATA_POSTGRES_URL")?) 24 - .await?; 25 - 26 - let addr = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string()); 27 - let nc = connect(&addr).await?; 28 - println!("Connected to NATS server at {}", addr.bright_green()); 29 - 30 - let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?; 31 - println!("Subscribed to {}", "rocksky.spotify.user".bright_green()); 32 - 33 - let users = find_spotify_users(&pool, 0, 100).await?; 34 - println!("Found {} users", users.len().bright_green()); 35 - 36 - // Shared HashMap to manage threads and their stop flags 37 - let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> = 38 - Arc::new(Mutex::new(HashMap::new())); 39 - 40 - // Start threads for all users 41 - for user in users { 42 - let email = user.0.clone(); 43 - let token = user.1.clone(); 44 - let did = user.2.clone(); 45 - let stop_flag = Arc::new(AtomicBool::new(false)); 46 - let cache = cache.clone(); 47 - let nc = nc.clone(); 48 - let thread_map = Arc::clone(&thread_map); 49 - 50 - thread_map 51 - .lock() 52 - .unwrap() 53 - .insert(email.clone(), Arc::clone(&stop_flag)); 54 - 55 - thread::spawn(move || { 56 - let rt = tokio::runtime::Runtime::new().unwrap(); 57 - match rt.block_on(async { 58 - watch_currently_playing(email.clone(), token, did, stop_flag, cache.clone()) 59 - .await?; 60 - Ok::<(), Error>(()) 61 - }) { 62 - Ok(_) => { 63 - println!( 64 - "{} Thread for user {} has exited normally", 65 - format!("[{}]", email).bright_green(), 66 - email.bright_green() 67 - ); 68 - // restart the thread by publishing a message 69 - match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) { 70 - Ok(_) => { 71 - println!( 72 - "{} Published message to restart thread for user: {}", 73 - format!("[{}]", email).bright_green(), 74 - email.bright_green() 75 - ); 76 - } 77 - Err(e) => { 78 - println!( 79 - "{} Error publishing message to restart thread: {}", 80 - format!("[{}]", email).bright_green(), 81 - e.to_string().bright_red() 82 - ); 83 - } 84 - } 85 - } 86 - Err(e) => { 87 - println!( 88 - "{} Error starting thread for user: {} - {}", 89 - format!("[{}]", email).bright_green(), 90 - email.bright_green(), 91 - e.to_string().bright_red() 92 - ); 93 - 94 - // If there's an error, publish a message to restart the thread 95 - match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) { 96 - Ok(_) => { 97 - println!( 98 - "{} Published message to restart thread for user: {}", 99 - format!("[{}]", email).bright_green(), 100 - email.bright_green() 101 - ); 102 - } 103 - Err(e) => { 104 - println!( 105 - "{} Error publishing message to restart thread: {}", 106 - format!("[{}]", email).bright_green(), 107 - e.to_string().bright_red() 108 - ); 109 - } 110 - } 111 - } 112 - } 113 - }); 114 - } 115 - 116 - // Handle subscription messages 117 - while let Some(message) = sub.next().await { 118 - let user_id = String::from_utf8(message.payload.to_vec()).unwrap(); 119 - println!( 120 - "Received message to restart thread for user: {}", 121 - user_id.bright_green() 122 - ); 123 - 124 - let mut thread_map = thread_map.lock().unwrap(); 125 - 126 - // Check if the user exists in the thread map 127 - if let Some(stop_flag) = thread_map.get(&user_id) { 128 - // Stop the existing thread 129 - stop_flag.store(true, std::sync::atomic::Ordering::Relaxed); 130 - 131 - // Create a new stop flag and restart the thread 132 - let new_stop_flag = Arc::new(AtomicBool::new(false)); 133 - thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag)); 134 - 135 - let user = find_spotify_user(&pool, &user_id).await?; 136 - 137 - if user.is_none() { 138 - println!( 139 - "Spotify user not found: {}, skipping", 140 - user_id.bright_green() 141 - ); 142 - continue; 143 - } 144 - 145 - let user = user.unwrap(); 146 - 147 - let email = user.0.clone(); 148 - let token = user.1.clone(); 149 - let did = user.2.clone(); 150 - let cache = cache.clone(); 151 - 152 - thread::spawn(move || { 153 - let rt = tokio::runtime::Runtime::new().unwrap(); 154 - match rt.block_on(async { 155 - watch_currently_playing( 156 - email.clone(), 157 - token, 158 - did, 159 - new_stop_flag, 160 - cache.clone(), 161 - ) 162 - .await?; 163 - Ok::<(), Error>(()) 164 - }) { 165 - Ok(_) => {} 166 - Err(e) => { 167 - println!( 168 - "{} Error restarting thread for user: {} - {}", 169 - format!("[{}]", email).bright_green(), 170 - email.bright_green(), 171 - e.to_string().bright_red() 172 - ); 173 - } 174 - } 175 - }); 176 - 177 - println!("Restarted thread for user: {}", user_id.bright_green()); 178 - } else { 179 - println!( 180 - "No thread found for user: {}, starting new thread", 181 - user_id.bright_green() 182 - ); 183 - let user = find_spotify_user(&pool, &user_id).await?; 184 - if let Some(user) = user { 185 - let email = user.0.clone(); 186 - let token = user.1.clone(); 187 - let did = user.2.clone(); 188 - let stop_flag = Arc::new(AtomicBool::new(false)); 189 - let cache = cache.clone(); 190 - let nc = nc.clone(); 191 - 192 - thread_map.insert(email.clone(), Arc::clone(&stop_flag)); 193 - 194 - thread::spawn(move || { 195 - let rt = tokio::runtime::Runtime::new().unwrap(); 196 - match rt.block_on(async { 197 - watch_currently_playing( 198 - email.clone(), 199 - token, 200 - did, 201 - stop_flag, 202 - cache.clone(), 203 - ) 204 - .await?; 205 - Ok::<(), Error>(()) 206 - }) { 207 - Ok(_) => {} 208 - Err(e) => { 209 - println!( 210 - "{} Error starting thread for user: {} - {}", 211 - format!("[{}]", email).bright_green(), 212 - email.bright_green(), 213 - e.to_string().bright_red() 214 - ); 215 - match rt 216 - .block_on(nc.publish("rocksky.spotify.user", email.clone().into())) 217 - { 218 - Ok(_) => {} 219 - Err(e) => { 220 - println!( 221 - "{} Error publishing message to restart thread: {}", 222 - format!("[{}]", email).bright_green(), 223 - e.to_string().bright_red() 224 - ); 225 - } 226 - } 227 - } 228 - } 229 - }); 230 - } 231 - } 232 - } 233 - 234 - Ok(()) 235 - }