Rockbox open source high quality audio player as a Music Player Daemon
mpris rockbox mpd libadwaita audio rust zig deno
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #81 from tsirysndr/feat/mpd-idle-command

mpd: add support for mpd `idle` command

authored by

Tsiry Sandratraina and committed by
GitHub
03aff07d 77cc683a

+351 -364
+54 -58
crates/mpd/src/handlers/batch.rs
··· 1 1 use anyhow::Error; 2 - use tokio::{ 3 - io::{AsyncWriteExt, BufReader}, 4 - net::TcpStream, 5 - }; 2 + use tokio::sync::mpsc::Sender; 6 3 7 4 use crate::{parse_command, setup_context, Context}; 8 5 ··· 28 25 pub async fn handle_command_list_begin( 29 26 ctx: &mut Context, 30 27 request: &str, 31 - stream: &mut BufReader<TcpStream>, 28 + tx: Sender<String>, 32 29 ) -> Result<String, Error> { 33 30 let mut ctx = setup_context(true, Some(ctx.clone())).await?; 34 31 ··· 40 37 let mut response = String::new(); 41 38 for request in commands { 42 39 let command = parse_command(&request)?; 43 - response.push_str(&match_command(&command, &mut ctx, request, stream).await?); 40 + response.push_str(&match_command(&command, &mut ctx, request, tx.clone()).await?); 44 41 } 45 42 46 - stream.write_all(response.as_bytes()).await?; 43 + tx.send(response.clone()).await?; 47 44 48 45 Ok(response) 49 46 } ··· 51 48 pub async fn handle_command_list_ok_begin( 52 49 ctx: &mut Context, 53 50 request: &str, 54 - stream: &mut BufReader<TcpStream>, 51 + tx: Sender<String>, 55 52 ) -> Result<String, Error> { 56 53 let mut ctx = setup_context(true, Some(ctx.clone())).await?; 57 54 ··· 64 61 65 62 for request in commands { 66 63 let command = parse_command(&request)?; 67 - response.push_str(&match_command(&command, &mut ctx, request, stream).await?); 64 + response.push_str(&match_command(&command, &mut ctx, request, tx.clone()).await?); 68 65 } 69 66 70 67 let mut response = response.replace("OK\n", "list_OK\n"); 71 68 response.push_str("OK\n"); 72 - stream.write_all(response.as_bytes()).await?; 69 + tx.send(response.clone()).await?; 73 70 Ok(response) 74 71 } 75 72 ··· 77 74 command: &str, 78 75 ctx: &mut Context, 79 76 request: &str, 80 - stream: &mut BufReader<TcpStream>, 77 + tx: Sender<String>, 81 78 ) -> Result<String, Error> { 82 79 match command { 83 - "play" => handle_play(ctx, request, stream).await, 84 - "pause" => handle_pause(ctx, request, stream).await, 85 - "toggle" => handle_toggle(ctx, request, stream).await, 86 - "next" => handle_next(ctx, request, stream).await, 87 - "previous" => handle_previous(ctx, request, stream).await, 88 - "playid" => handle_playid(ctx, request, stream).await, 89 - "seek" => handle_seek(ctx, request, stream).await, 90 - "seekid" => handle_seekid(ctx, request, stream).await, 91 - "seekcur" => handle_seekcur(ctx, request, stream).await, 92 - "random" => handle_random(ctx, request, stream).await, 93 - "repeat" => handle_repeat(ctx, request, stream).await, 94 - "getvol" => handle_getvol(ctx, request, stream).await, 95 - "setvol" => handle_setvol(ctx, request, stream).await, 96 - "volume" => handle_setvol(ctx, request, stream).await, 97 - "single" => handle_single(ctx, request, stream).await, 98 - "shuffle" => handle_shuffle(ctx, request, stream).await, 99 - "add" => handle_add(ctx, request, stream).await, 100 - "addid" => handle_addid(ctx, request, stream).await, 101 - "playlistinfo" => handle_playlistinfo(ctx, request, stream).await, 102 - "delete" => handle_delete(ctx, request, stream).await, 103 - "clear" => handle_clear(ctx, request, stream).await, 104 - "move" => handle_move(ctx, request, stream).await, 105 - "list album" => handle_list_album(ctx, request, stream).await, 106 - "list artist" => handle_list_artist(ctx, request, stream).await, 107 - "list title" => handle_list_title(ctx, request, stream).await, 108 - "update" => handle_rescan(ctx, request, stream).await, 109 - "search" => handle_search(ctx, request, stream).await, 110 - "rescan" => handle_rescan(ctx, request, stream).await, 111 - "status" => handle_status(ctx, request, stream).await, 112 - "currentsong" => handle_currentsong(ctx, request, stream).await, 113 - "config" => handle_config(ctx, request, stream).await, 114 - "tagtypes " => handle_tagtypes(ctx, request, stream).await, 115 - "tagtypes clear" => handle_tagtypes_clear(ctx, request, stream).await, 116 - "tagtypes enable" => handle_tagtypes_enable(ctx, request, stream).await, 117 - "stats" => handle_stats(ctx, request, stream).await, 118 - "plchanges" => handle_playlistinfo(ctx, request, stream).await, 119 - "outputs" => handle_outputs(ctx, request, stream).await, 120 - "decoders" => handle_decoders(ctx, request, stream).await, 121 - "lsinfo" => handle_lsinfo(ctx, request, stream).await, 122 - "listall" => handle_listall(ctx, request, stream).await, 123 - "listallinfo" => handle_listallinfo(ctx, request, stream).await, 124 - "listfiles" => handle_listfiles(ctx, request, stream).await, 125 - "find artist" => handle_find_artist(ctx, request, stream).await, 126 - "find album" => handle_find_album(ctx, request, stream).await, 127 - "find title" => handle_find_title(ctx, request, stream).await, 80 + "play" => handle_play(ctx, request, tx.clone()).await, 81 + "pause" => handle_pause(ctx, request, tx.clone()).await, 82 + "toggle" => handle_toggle(ctx, request, tx.clone()).await, 83 + "next" => handle_next(ctx, request, tx.clone()).await, 84 + "previous" => handle_previous(ctx, request, tx.clone()).await, 85 + "playid" => handle_playid(ctx, request, tx.clone()).await, 86 + "seek" => handle_seek(ctx, request, tx.clone()).await, 87 + "seekid" => handle_seekid(ctx, request, tx.clone()).await, 88 + "seekcur" => handle_seekcur(ctx, request, tx.clone()).await, 89 + "random" => handle_random(ctx, request, tx.clone()).await, 90 + "repeat" => handle_repeat(ctx, request, tx.clone()).await, 91 + "getvol" => handle_getvol(ctx, request, tx.clone()).await, 92 + "setvol" => handle_setvol(ctx, request, tx.clone()).await, 93 + "volume" => handle_setvol(ctx, request, tx.clone()).await, 94 + "single" => handle_single(ctx, request, tx.clone()).await, 95 + "shuffle" => handle_shuffle(ctx, request, tx.clone()).await, 96 + "add" => handle_add(ctx, request, tx.clone()).await, 97 + "addid" => handle_addid(ctx, request, tx.clone()).await, 98 + "playlistinfo" => handle_playlistinfo(ctx, request, tx.clone()).await, 99 + "delete" => handle_delete(ctx, request, tx.clone()).await, 100 + "clear" => handle_clear(ctx, request, tx.clone()).await, 101 + "move" => handle_move(ctx, request, tx.clone()).await, 102 + "list album" => handle_list_album(ctx, request, tx.clone()).await, 103 + "list artist" => handle_list_artist(ctx, request, tx.clone()).await, 104 + "list title" => handle_list_title(ctx, request, tx.clone()).await, 105 + "update" => handle_rescan(ctx, request, tx.clone()).await, 106 + "search" => handle_search(ctx, request, tx.clone()).await, 107 + "rescan" => handle_rescan(ctx, request, tx.clone()).await, 108 + "status" => handle_status(ctx, request, tx.clone()).await, 109 + "currentsong" => handle_currentsong(ctx, request, tx.clone()).await, 110 + "config" => handle_config(ctx, request, tx.clone()).await, 111 + "tagtypes " => handle_tagtypes(ctx, request, tx.clone()).await, 112 + "tagtypes clear" => handle_tagtypes_clear(ctx, request, tx.clone()).await, 113 + "tagtypes enable" => handle_tagtypes_enable(ctx, request, tx.clone()).await, 114 + "stats" => handle_stats(ctx, request, tx.clone()).await, 115 + "plchanges" => handle_playlistinfo(ctx, request, tx.clone()).await, 116 + "outputs" => handle_outputs(ctx, request, tx.clone()).await, 117 + "decoders" => handle_decoders(ctx, request, tx.clone()).await, 118 + "lsinfo" => handle_lsinfo(ctx, request, tx.clone()).await, 119 + "listall" => handle_listall(ctx, request, tx.clone()).await, 120 + "listallinfo" => handle_listallinfo(ctx, request, tx.clone()).await, 121 + "listfiles" => handle_listfiles(ctx, request, tx.clone()).await, 122 + "find artist" => handle_find_artist(ctx, request, tx.clone()).await, 123 + "find album" => handle_find_album(ctx, request, tx.clone()).await, 124 + "find title" => handle_find_title(ctx, request, tx.clone()).await, 128 125 _ => { 129 126 println!("Unhandled command: {}", request); 130 127 if !ctx.batch { 131 - stream 132 - .write_all(b"ACK [5@0] {unhandled} unknown command\n") 128 + tx.send("ACK [5@0] {unhandled} unknown command\n".to_string()) 133 129 .await?; 134 130 } 135 131 Ok("ACK [5@0] {unhandled} unknown command\n".to_string())
+12 -18
crates/mpd/src/handlers/browse.rs
··· 5 5 use regex::Regex; 6 6 use rockbox_library::repo; 7 7 use rockbox_settings::get_music_dir; 8 - use tokio::{ 9 - io::{AsyncWriteExt, BufReader}, 10 - net::TcpStream, 11 - }; 8 + use tokio::sync::mpsc::Sender; 12 9 13 10 pub async fn handle_lsinfo( 14 11 ctx: &mut Context, 15 12 request: &str, 16 - stream: &mut BufReader<TcpStream>, 13 + tx: Sender<String>, 17 14 ) -> Result<String, Error> { 18 15 repo::track::all(ctx.pool.clone()).await?; 19 16 ··· 33 30 // verify if path is a file or directory or doesn't exist 34 31 if fs::metadata(path).is_err() { 35 32 if !ctx.batch { 36 - stream 37 - .write_all(b"ACK [50@0] {lsinfo} No such file or directory\n") 33 + tx.send("ACK [50@0] {lsinfo} No such file or directory\n".to_string()) 38 34 .await?; 39 35 } 40 36 return Ok("ACK [50@0] {lsinfo} No such file or directory\n".to_string()); ··· 53 49 } 54 50 55 51 if !ctx.batch { 56 - stream.write_all(response.as_bytes()).await?; 52 + tx.send(response.clone()).await?; 57 53 } 58 54 59 55 Ok(response) ··· 62 58 pub async fn handle_listall( 63 59 ctx: &mut Context, 64 60 _request: &str, 65 - stream: &mut BufReader<TcpStream>, 61 + tx: Sender<String>, 66 62 ) -> Result<String, Error> { 67 63 let mut response: String = "".to_string(); 68 64 let music_dir = get_music_dir()?; ··· 74 70 } 75 71 76 72 if !ctx.batch { 77 - stream.write_all(response.as_bytes()).await?; 73 + tx.send(response.clone()).await?; 78 74 } 79 75 80 76 Ok(response) ··· 83 79 pub async fn handle_listallinfo( 84 80 ctx: &mut Context, 85 81 _request: &str, 86 - stream: &mut BufReader<TcpStream>, 82 + tx: Sender<String>, 87 83 ) -> Result<String, Error> { 88 84 repo::track::all(ctx.pool.clone()).await?; 89 85 let music_dir = get_music_dir()?; ··· 92 88 // verify if path is a file or directory or doesn't exist 93 89 if fs::metadata(&path).is_err() { 94 90 if !ctx.batch { 95 - stream 96 - .write_all(b"ACK [50@0] {lsinfo} No such file or directory\n") 91 + tx.send("ACK [50@0] {lsinfo} No such file or directory\n".to_string()) 97 92 .await?; 98 93 } 99 94 return Ok("ACK [50@0] {lsinfo} No such file or directory\n".to_string()); ··· 106 101 response.push_str("OK\n"); 107 102 108 103 if !ctx.batch { 109 - stream.write_all(response.as_bytes()).await?; 104 + tx.send(response.clone()).await?; 110 105 } 111 106 112 107 Ok(response) ··· 115 110 pub async fn handle_listfiles( 116 111 ctx: &mut Context, 117 112 request: &str, 118 - stream: &mut BufReader<TcpStream>, 113 + tx: Sender<String>, 119 114 ) -> Result<String, Error> { 120 115 let request = request.trim(); 121 116 let re = Regex::new(r#"^([\w-]+)(?:\s+"([^"]*)")?$"#).unwrap(); ··· 133 128 // verify if path is a file or directory or doesn't exist 134 129 if fs::metadata(&path).is_err() { 135 130 if !ctx.batch { 136 - stream 137 - .write_all(b"ACK [50@0] {lsinfo} No such file or directory\n") 131 + tx.send("ACK [50@0] {lsinfo} No such file or directory\n".to_string()) 138 132 .await?; 139 133 } 140 134 return Ok("ACK [50@0] {lsinfo} No such file or directory\n".to_string()); ··· 153 147 } 154 148 155 149 if !ctx.batch { 156 - stream.write_all(response.as_bytes()).await?; 150 + tx.send(response.clone()).await?; 157 151 } 158 152 159 153 Ok(response)
+32 -40
crates/mpd/src/handlers/library.rs
··· 9 9 ScanLibraryRequest, SearchRequest, 10 10 }; 11 11 use rockbox_settings::get_music_dir; 12 - use tokio::{ 13 - io::{AsyncWriteExt, BufReader}, 14 - net::TcpStream, 15 - }; 12 + use tokio::sync::mpsc::Sender; 16 13 17 14 use crate::Context; 18 15 19 16 pub async fn handle_list_album( 20 17 ctx: &mut Context, 21 18 request: &str, 22 - stream: &mut BufReader<TcpStream>, 19 + tx: Sender<String>, 23 20 ) -> Result<String, Error> { 24 21 let query = request.replace("list album", "").replace("list Album", ""); 25 22 let query = query.trim(); ··· 46 43 let response = format!("{}OK\n", response); 47 44 48 45 if !ctx.batch { 49 - stream.write_all(response.as_bytes()).await?; 46 + tx.send(response.clone()).await?; 50 47 } 51 48 52 49 Ok(response) ··· 55 52 pub async fn handle_list_artist( 56 53 ctx: &mut Context, 57 54 _request: &str, 58 - stream: &mut BufReader<TcpStream>, 55 + tx: Sender<String>, 59 56 ) -> Result<String, Error> { 60 57 let response = ctx.library.get_artists(GetArtistsRequest {}).await?; 61 58 let response = response.into_inner(); ··· 66 63 .collect::<String>(); 67 64 let response = format!("{}OK\n", response); 68 65 if !ctx.batch { 69 - stream.write_all(response.as_bytes()).await?; 66 + tx.send(response.clone()).await?; 70 67 } 71 68 Ok(response) 72 69 } ··· 74 71 pub async fn handle_list_title( 75 72 ctx: &mut Context, 76 73 _request: &str, 77 - stream: &mut BufReader<TcpStream>, 74 + tx: Sender<String>, 78 75 ) -> Result<String, Error> { 79 76 let response = ctx.library.get_tracks(GetTracksRequest {}).await?; 80 77 let response = response.into_inner(); ··· 85 82 .collect::<String>(); 86 83 let response = format!("{}OK\n", response); 87 84 if !ctx.batch { 88 - stream.write_all(response.as_bytes()).await?; 85 + tx.send(response.clone()).await?; 89 86 } 90 87 Ok(response) 91 88 } ··· 93 90 pub async fn handle_search( 94 91 ctx: &mut Context, 95 92 request: &str, 96 - stream: &mut BufReader<TcpStream>, 93 + tx: Sender<String>, 97 94 ) -> Result<String, Error> { 98 95 let mut term = request 99 96 .trim_matches('"') ··· 133 130 .collect::<String>(); 134 131 let response = format!("{}OK\n", response); 135 132 if !ctx.batch { 136 - stream.write_all(response.as_bytes()).await?; 133 + tx.send(response.clone()).await?; 137 134 } 138 135 return Ok(response); 139 136 } ··· 159 156 .collect::<String>(); 160 157 let response = format!("{}OK\n", response); 161 158 if !ctx.batch { 162 - stream.write_all(response.as_bytes()).await?; 159 + tx.send(response.clone()).await?; 163 160 } 164 161 Ok(response) 165 162 } ··· 167 164 pub async fn handle_rescan( 168 165 ctx: &mut Context, 169 166 request: &str, 170 - stream: &mut BufReader<TcpStream>, 167 + tx: Sender<String>, 171 168 ) -> Result<String, Error> { 172 169 let response = ctx 173 170 .settings ··· 188 185 .await?; 189 186 190 187 if !ctx.batch { 191 - stream.write_all(b"OK\n").await?; 188 + tx.send("OK\n".to_string()).await?; 192 189 } 193 190 Ok("OK\n".to_string()) 194 191 } ··· 196 193 pub async fn handle_config( 197 194 ctx: &mut Context, 198 195 _request: &str, 199 - stream: &mut BufReader<TcpStream>, 196 + tx: Sender<String>, 200 197 ) -> Result<String, Error> { 201 198 let response = "ACK [4@0] {config} Command only permitted to local clients"; 202 199 if !ctx.batch { 203 - stream.write_all(response.as_bytes()).await?; 200 + tx.send(response.to_string()).await?; 204 201 } 205 202 206 203 Ok(response.to_string()) ··· 209 206 pub async fn handle_tagtypes( 210 207 ctx: &mut Context, 211 208 _request: &str, 212 - stream: &mut BufReader<TcpStream>, 209 + tx: Sender<String>, 213 210 ) -> Result<String, Error> { 214 211 let response = format!( 215 212 "Tagtype: Artist\nTagtype: Album\nTagtype: Title\nTagtype: Track\nTagtype: Date\nOK\n" 216 213 ); 217 214 218 215 if !ctx.batch { 219 - stream.write_all(response.as_bytes()).await?; 216 + tx.send(response.clone()).await?; 220 217 } 221 218 222 219 Ok(response) ··· 225 222 pub async fn handle_tagtypes_clear( 226 223 ctx: &mut Context, 227 224 _request: &str, 228 - stream: &mut BufReader<TcpStream>, 225 + tx: Sender<String>, 229 226 ) -> Result<String, Error> { 230 227 let response = format!("OK\n"); 231 228 232 229 if !ctx.batch { 233 - stream.write_all(response.as_bytes()).await?; 230 + tx.send(response.clone()).await?; 234 231 } 235 232 236 233 Ok(response) ··· 239 236 pub async fn handle_tagtypes_enable( 240 237 ctx: &mut Context, 241 238 _request: &str, 242 - stream: &mut BufReader<TcpStream>, 239 + tx: Sender<String>, 243 240 ) -> Result<String, Error> { 244 241 let response = format!("OK\n"); 245 242 246 243 if !ctx.batch { 247 - stream.write_all(response.as_bytes()).await?; 244 + tx.send(response.to_string()).await?; 248 245 } 249 246 250 247 Ok("".to_string()) ··· 253 250 pub async fn handle_stats( 254 251 ctx: &mut Context, 255 252 _request: &str, 256 - stream: &mut BufReader<TcpStream>, 253 + tx: Sender<String>, 257 254 ) -> Result<String, Error> { 258 255 let response = ctx.library.get_albums(GetAlbumsRequest {}).await?; 259 256 let response = response.into_inner(); ··· 270 267 ); 271 268 272 269 if !ctx.batch { 273 - stream.write_all(response.as_bytes()).await?; 270 + tx.send(response.clone()).await?; 274 271 } 275 272 276 273 Ok(response) ··· 279 276 pub async fn handle_find_artist( 280 277 ctx: &mut Context, 281 278 request: &str, 282 - stream: &mut BufReader<TcpStream>, 279 + tx: Sender<String>, 283 280 ) -> Result<String, Error> { 284 281 let re = Regex::new(r#"(?i)(artist|album|date)\s+\"([^\"]+)\""#).unwrap(); 285 282 let mut fields = HashMap::new(); ··· 309 306 build_file_metadata(tracks, &mut response).await?; 310 307 311 308 if !ctx.batch { 312 - stream.write_all(response.as_bytes()).await?; 309 + tx.send(response.clone()).await?; 313 310 } 314 311 315 312 Ok(response) ··· 318 315 pub async fn handle_find_album( 319 316 ctx: &mut Context, 320 317 request: &str, 321 - stream: &mut BufReader<TcpStream>, 318 + tx: Sender<String>, 322 319 ) -> Result<String, Error> { 323 320 let arg = request.replace("find album ", "").replace("find Album", ""); 324 321 let arg = arg.trim(); ··· 330 327 build_file_metadata(tracks, &mut response).await?; 331 328 332 329 if !ctx.batch { 333 - stream.write_all(response.as_bytes()).await?; 330 + tx.send(response.clone()).await?; 334 331 } 335 332 336 333 Ok(response) ··· 339 336 pub async fn handle_find_title( 340 337 ctx: &mut Context, 341 338 request: &str, 342 - stream: &mut BufReader<TcpStream>, 339 + tx: Sender<String>, 343 340 ) -> Result<String, Error> { 344 341 let arg = request 345 342 .replace("find title ", "") ··· 353 350 build_file_metadata(tracks, &mut response).await?; 354 351 355 352 if !ctx.batch { 356 - stream.write_all(response.as_bytes()).await?; 353 + tx.send(response.clone()).await?; 357 354 } 358 355 359 356 Ok(response) ··· 362 359 pub async fn handle_find( 363 360 ctx: &mut Context, 364 361 request: &str, 365 - stream: &mut BufReader<TcpStream>, 362 + tx: Sender<String>, 366 363 ) -> Result<String, Error> { 367 364 let arg = request.replace("find ", ""); 368 365 let arg = arg.trim(); ··· 371 368 let mut parser = Parser::new(&arg); 372 369 match parser.parse() { 373 370 Ok(expr) => { 374 - execute(ctx, &expr, stream).await?; 371 + execute(ctx, &expr, tx).await?; 375 372 } 376 373 Err(e) => return Err(Error::msg(e)), 377 374 } 378 375 Ok("".to_string()) 379 376 } 380 377 381 - async fn execute( 382 - ctx: &mut Context, 383 - expr: &Expression, 384 - stream: &mut BufReader<TcpStream>, 385 - ) -> Result<(), Error> { 378 + async fn execute(ctx: &mut Context, expr: &Expression, tx: Sender<String>) -> Result<(), Error> { 386 379 let mut columns = HashMap::new(); 387 380 columns.insert("Title".to_string(), "title".to_string()); 388 381 columns.insert("Artist".to_string(), "artist".to_string()); ··· 408 401 let mut response: String = "".to_string(); 409 402 410 403 build_file_metadata(tracks, &mut response).await?; 411 - 412 - stream.write_all(response.as_bytes()).await?; 404 + tx.send(response).await?; 413 405 Ok(()) 414 406 } 415 407
+41
crates/mpd/src/handlers/mod.rs
··· 4 4 pub mod playback; 5 5 pub mod queue; 6 6 pub mod system; 7 + 8 + #[derive(Debug, Clone, PartialEq)] 9 + pub enum Subsystem { 10 + Database, 11 + Update, 12 + StoredPlaylist, 13 + Playlist, 14 + Player, 15 + Mixer, 16 + Output, 17 + Options, 18 + Partition, 19 + Sticker, 20 + Subscription, 21 + Message, 22 + Neighbor, 23 + Mount, 24 + NoIdle, 25 + } 26 + 27 + impl ToString for Subsystem { 28 + fn to_string(&self) -> String { 29 + match self { 30 + Subsystem::Database => "database".to_string(), 31 + Subsystem::Update => "update".to_string(), 32 + Subsystem::StoredPlaylist => "stored_playlist".to_string(), 33 + Subsystem::Playlist => "playlist".to_string(), 34 + Subsystem::Player => "player".to_string(), 35 + Subsystem::Mixer => "mixer".to_string(), 36 + Subsystem::Output => "output".to_string(), 37 + Subsystem::Options => "options".to_string(), 38 + Subsystem::Partition => "partition".to_string(), 39 + Subsystem::Sticker => "sticker".to_string(), 40 + Subsystem::Subscription => "subscription".to_string(), 41 + Subsystem::Message => "message".to_string(), 42 + Subsystem::Neighbor => "neighbor".to_string(), 43 + Subsystem::Mount => "mount".to_string(), 44 + Subsystem::NoIdle => "noidle".to_string(), 45 + } 46 + } 47 + }
+55 -66
crates/mpd/src/handlers/playback.rs
··· 3 3 AdjustVolumeRequest, NextRequest, PauseRequest, PlayRequest, PreviousRequest, ResumeRequest, 4 4 SaveSettingsRequest, StartRequest, 5 5 }; 6 - use tokio::{ 7 - io::{AsyncWriteExt, BufReader}, 8 - net::TcpStream, 9 - }; 6 + use tokio::sync::mpsc::Sender; 10 7 11 8 use crate::Context; 12 9 10 + use super::Subsystem; 11 + 13 12 pub async fn handle_play( 14 13 ctx: &mut Context, 15 14 _request: &str, 16 - stream: &mut BufReader<TcpStream>, 15 + tx: Sender<String>, 17 16 ) -> Result<String, Error> { 18 17 ctx.playback.resume(ResumeRequest {}).await?; 19 - match ctx.event_sender.send("player".to_string()) { 18 + match ctx.event_sender.send(Subsystem::Player) { 20 19 Ok(_) => {} 21 20 Err(_) => {} 22 21 } 23 22 24 23 if !ctx.batch { 25 - stream.write_all(b"OK\n").await?; 24 + tx.send("OK\n".to_string()).await?; 26 25 } 27 26 28 27 Ok("OK\n".to_string()) ··· 31 30 pub async fn handle_pause( 32 31 ctx: &mut Context, 33 32 _request: &str, 34 - stream: &mut BufReader<TcpStream>, 33 + tx: Sender<String>, 35 34 ) -> Result<String, Error> { 36 35 let playback_status = ctx.playback_status.lock().await; 37 36 let status = playback_status.as_ref().map(|x| x.status); ··· 44 43 ctx.playback.resume(ResumeRequest {}).await?; 45 44 } 46 45 _ => { 47 - stream 48 - .write_all(b"ACK [2@0] {pause} no song is playing\n") 46 + tx.send("ACK [2@0] {pause} no song is playing\n".to_string()) 49 47 .await?; 50 48 } 51 49 } 52 50 53 - match ctx.event_sender.send("player".to_string()) { 51 + match ctx.event_sender.send(Subsystem::Player) { 54 52 Ok(_) => {} 55 53 Err(_) => {} 56 54 } ··· 61 59 pub async fn handle_toggle( 62 60 ctx: &mut Context, 63 61 _request: &str, 64 - stream: &mut BufReader<TcpStream>, 62 + tx: Sender<String>, 65 63 ) -> Result<String, Error> { 66 64 let playback_status = ctx.playback_status.lock().await; 67 65 let playback_status = playback_status.as_ref().map(|x| x.status); ··· 74 72 ctx.playback.resume(ResumeRequest {}).await?; 75 73 } 76 74 _ => { 77 - stream 78 - .write_all(b"ACK [2@0] {toggle} no song is playing\n") 75 + tx.send("ACK [2@0] {toggle} no song is playing\n".to_string()) 79 76 .await?; 80 77 } 81 78 } 82 79 if !ctx.batch { 83 - stream.write_all(b"OK\n").await?; 80 + tx.send("OK\n".to_string()).await?; 84 81 } 85 82 Ok("OK\n".to_string()) 86 83 } ··· 88 85 pub async fn handle_status( 89 86 ctx: &mut Context, 90 87 _request: &str, 91 - stream: &mut BufReader<TcpStream>, 88 + tx: Sender<String>, 92 89 ) -> Result<String, Error> { 93 90 let playback_status = ctx.playback_status.lock().await; 94 91 let playback_status = playback_status.as_ref().map(|x| x.status); ··· 127 124 status, repeat, random, volume, 128 125 ); 129 126 if !ctx.batch { 130 - stream.write_all(response.as_bytes()).await?; 127 + tx.send(response.clone()).await?; 131 128 } 132 129 return Ok(response); 133 130 } ··· 154 151 status, repeat, single, random, time, elapsed, duration, volume, audio, bitrate, 155 152 ); 156 153 if !ctx.batch { 157 - stream.write_all(response.as_bytes()).await?; 154 + tx.send(response.clone()).await?; 158 155 } 159 156 return Ok(response); 160 157 } ··· 170 167 ); 171 168 172 169 if !ctx.batch { 173 - stream.write_all(response.as_bytes()).await?; 170 + tx.send(response.clone()).await?; 174 171 } 175 172 Ok(response) 176 173 } ··· 178 175 pub async fn handle_next( 179 176 ctx: &mut Context, 180 177 _request: &str, 181 - stream: &mut BufReader<TcpStream>, 178 + tx: Sender<String>, 182 179 ) -> Result<String, Error> { 183 180 ctx.playback.next(NextRequest {}).await?; 184 - match ctx.event_sender.send("player".to_string()) { 181 + match ctx.event_sender.send(Subsystem::Player) { 185 182 Ok(_) => {} 186 183 Err(_) => {} 187 184 } 188 185 if !ctx.batch { 189 - stream.write_all(b"OK\n").await?; 186 + tx.send("OK\n".to_string()).await?; 190 187 } 191 188 Ok("OK\n".to_string()) 192 189 } ··· 194 191 pub async fn handle_previous( 195 192 ctx: &mut Context, 196 193 _request: &str, 197 - stream: &mut BufReader<TcpStream>, 194 + tx: Sender<String>, 198 195 ) -> Result<String, Error> { 199 196 ctx.playback.previous(PreviousRequest {}).await?; 200 - match ctx.event_sender.send("player".to_string()) { 197 + match ctx.event_sender.send(Subsystem::Player) { 201 198 Ok(_) => {} 202 199 Err(_) => {} 203 200 } 204 201 205 202 if !ctx.batch { 206 - stream.write_all(b"OK\n").await?; 203 + tx.send("OK\n".to_string()).await?; 207 204 } 208 205 209 206 Ok("OK\n".to_string()) ··· 212 209 pub async fn handle_playid( 213 210 ctx: &mut Context, 214 211 request: &str, 215 - stream: &mut BufReader<TcpStream>, 212 + tx: Sender<String>, 216 213 ) -> Result<String, Error> { 217 214 let arg = request.split_whitespace().nth(1); 218 215 219 216 if arg.is_none() { 220 - stream 221 - .write_all(b"ACK [2@0] {playid} incorrect arguments\n") 217 + tx.send("ACK [2@0] {playid} incorrect arguments\n".to_string()) 222 218 .await?; 223 219 return Ok("ACK [2@0] {playid} incorrect arguments\n".to_string()); 224 220 } ··· 229 225 let arg = arg.parse::<i32>(); 230 226 231 227 if arg.is_err() { 232 - stream 233 - .write_all(b"ACK [2@0] {playid} incorrect arguments\n") 228 + tx.send("ACK [2@0] {playid} incorrect arguments\n".to_string()) 234 229 .await?; 235 230 return Ok("ACK [2@0] {playid} incorrect arguments\n".to_string()); 236 231 } ··· 245 240 .await?; 246 241 247 242 if !ctx.batch { 248 - stream.write_all(b"OK\n").await?; 243 + tx.send("OK\n".to_string()).await?; 249 244 } 250 245 251 246 Ok("OK\n".to_string()) ··· 254 249 pub async fn handle_seek( 255 250 ctx: &mut Context, 256 251 request: &str, 257 - stream: &mut BufReader<TcpStream>, 252 + tx: Sender<String>, 258 253 ) -> Result<String, Error> { 259 254 // TODO: Implement seek 260 255 println!("{}", request); 261 256 262 257 if !ctx.batch { 263 - stream.write_all(b"OK\n").await?; 258 + tx.send("OK\n".to_string()).await?; 264 259 } 265 260 266 261 Ok("OK\n".to_string()) ··· 269 264 pub async fn handle_seekid( 270 265 ctx: &mut Context, 271 266 request: &str, 272 - stream: &mut BufReader<TcpStream>, 267 + tx: Sender<String>, 273 268 ) -> Result<String, Error> { 274 269 // TODO: Implement seekid 275 270 println!("{}", request); 276 271 277 272 if !ctx.batch { 278 - stream.write_all(b"OK\n").await?; 273 + tx.send("OK\n".to_string()).await?; 279 274 } 280 275 281 276 Ok("OK\n".to_string()) ··· 284 279 pub async fn handle_seekcur( 285 280 ctx: &mut Context, 286 281 request: &str, 287 - stream: &mut BufReader<TcpStream>, 282 + tx: Sender<String>, 288 283 ) -> Result<String, Error> { 289 284 let arg = request.split_whitespace().nth(1); 290 285 if arg.is_none() { 291 - stream 292 - .write_all(b"ACK [2@0] {seekcur} incorrect arguments\n") 286 + tx.send("ACK [2@0] {seekcur} incorrect arguments\n".to_string()) 293 287 .await?; 294 288 return Ok("ACK [2@0] {seekcur} incorrect arguments\n".to_string()); 295 289 } ··· 304 298 }) 305 299 .await?; 306 300 307 - match ctx.event_sender.send("player".to_string()) { 301 + match ctx.event_sender.send(Subsystem::Player) { 308 302 Ok(_) => {} 309 303 Err(_) => {} 310 304 } 311 305 312 306 if !ctx.batch { 313 - stream.write_all(b"OK\n").await?; 307 + tx.send("OK\n".to_string()).await?; 314 308 } 315 309 Ok("OK\n".to_string()) 316 310 } ··· 318 312 pub async fn handle_random( 319 313 ctx: &mut Context, 320 314 request: &str, 321 - stream: &mut BufReader<TcpStream>, 315 + tx: Sender<String>, 322 316 ) -> Result<String, Error> { 323 317 let arg = request.split_whitespace().nth(1); 324 318 if arg.is_none() { 325 319 if !ctx.batch { 326 - stream 327 - .write_all(b"ACK [2@0] {random} incorrect arguments\n") 320 + tx.send("ACK [2@0] {random} incorrect arguments\n".to_string()) 328 321 .await?; 329 322 } 330 323 return Ok("ACK [2@0] {random} incorrect arguments\n".to_string()); ··· 337 330 }) 338 331 .await?; 339 332 if !ctx.batch { 340 - stream.write_all(b"OK\n").await?; 333 + tx.send("OK\n".to_string()).await?; 341 334 } 342 335 Ok("OK\n".to_string()) 343 336 } ··· 345 338 pub async fn handle_repeat( 346 339 ctx: &mut Context, 347 340 request: &str, 348 - stream: &mut BufReader<TcpStream>, 341 + tx: Sender<String>, 349 342 ) -> Result<String, Error> { 350 343 let arg = request.split_whitespace().nth(1); 351 344 if arg.is_none() { 352 345 if !ctx.batch { 353 - stream 354 - .write_all(b"ACK [2@0] {repeat} incorrect arguments\n") 346 + tx.send("ACK [2@0] {repeat} incorrect arguments\n".to_string()) 355 347 .await?; 356 348 } 357 349 return Ok("ACK [2@0] {repeat} incorrect arguments\n".to_string()); ··· 367 359 }, 368 360 _ => { 369 361 if !ctx.batch { 370 - stream 371 - .write_all(b"ACK [2@0] {repeat} incorrect arguments\n") 362 + tx.send("ACK [2@0] {repeat} incorrect arguments\n".to_string()) 372 363 .await?; 373 364 } 374 365 return Ok("ACK [2@0] {repeat} incorrect arguments\n".to_string()); ··· 381 372 }) 382 373 .await?; 383 374 if !ctx.batch { 384 - stream.write_all(b"OK\n").await?; 375 + tx.send("OK\n".to_string()).await?; 385 376 } 386 377 Ok("OK\n".to_string()) 387 378 } ··· 389 380 pub async fn handle_getvol( 390 381 ctx: &mut Context, 391 382 _request: &str, 392 - stream: &mut BufReader<TcpStream>, 383 + tx: Sender<String>, 393 384 ) -> Result<String, Error> { 394 385 let settings = rockbox_sys::settings::get_global_settings(); 395 386 let volume = settings.volume; ··· 401 392 let response = format!("volume: {}\nOK\n", volume); 402 393 403 394 if !ctx.batch { 404 - stream.write_all(response.as_bytes()).await?; 395 + tx.send(response.clone()).await?; 405 396 } 406 397 407 398 Ok(response) ··· 410 401 pub async fn handle_setvol( 411 402 ctx: &mut Context, 412 403 request: &str, 413 - stream: &mut BufReader<TcpStream>, 404 + tx: Sender<String>, 414 405 ) -> Result<String, Error> { 415 406 let settings = rockbox_sys::settings::get_global_settings(); 416 407 let volume = settings.volume as i32; 417 408 let arg = request.split_whitespace().nth(1); 418 409 if arg.is_none() { 419 410 if !ctx.batch { 420 - stream 421 - .write_all(b"ACK [2@0] {setvol} incorrect arguments\n") 411 + tx.send("ACK [2@0] {setvol} incorrect arguments\n".to_string()) 422 412 .await?; 423 413 } 424 414 return Ok("ACK [2@0] {setvol} incorrect arguments\n".to_string()); ··· 435 425 .adjust_volume(AdjustVolumeRequest { steps }) 436 426 .await?; 437 427 if !ctx.batch { 438 - stream.write_all(b"OK\n").await?; 428 + tx.send("OK\n".to_string()).await?; 439 429 } 440 430 Ok("OK\n".to_string()) 441 431 } ··· 443 433 pub async fn handle_single( 444 434 ctx: &mut Context, 445 435 request: &str, 446 - stream: &mut BufReader<TcpStream>, 436 + tx: Sender<String>, 447 437 ) -> Result<String, Error> { 448 438 let arg = request.split_whitespace().nth(1); 449 439 if arg.is_none() { 450 440 if !ctx.batch { 451 - stream 452 - .write_all(b"ACK [2@0] {single} incorrect arguments\n") 441 + tx.send("ACK [2@0] {single} incorrect arguments\n".to_string()) 453 442 .await?; 454 443 } 455 444 return Ok("ACK [2@0] {single} incorrect arguments\n".to_string()); ··· 458 447 let mut single = ctx.single.lock().await; 459 448 *single = arg.unwrap().to_string(); 460 449 if !ctx.batch { 461 - stream.write_all(b"OK\n").await?; 450 + tx.send("OK\n".to_string()).await?; 462 451 } 463 452 Ok("OK\n".to_string()) 464 453 } ··· 466 455 pub async fn handle_currentsong( 467 456 ctx: &mut Context, 468 457 _request: &str, 469 - stream: &mut BufReader<TcpStream>, 458 + tx: Sender<String>, 470 459 ) -> Result<String, Error> { 471 460 let current = ctx.current_track.lock().await; 472 461 if current.is_none() { 473 462 let response = "OK\n".to_string(); 474 463 if !ctx.batch { 475 - stream.write_all(response.as_bytes()).await?; 464 + tx.send(response.clone()).await?; 476 465 } 477 466 return Ok(response); 478 467 } ··· 492 481 (current.length / 1000) as i64, 493 482 ); 494 483 if !ctx.batch { 495 - stream.write_all(response.as_bytes()).await?; 484 + tx.send(response.clone()).await?; 496 485 } 497 486 return Ok(response); 498 487 } ··· 511 500 (current.length / 1000) as i64, 512 501 ); 513 502 if !ctx.batch { 514 - stream.write_all(response.as_bytes()).await?; 503 + tx.send(response.clone()).await?; 515 504 } 516 505 Ok(response) 517 506 } ··· 519 508 pub async fn handle_outputs( 520 509 ctx: &mut Context, 521 510 _request: &str, 522 - stream: &mut BufReader<TcpStream>, 511 + tx: Sender<String>, 523 512 ) -> Result<String, Error> { 524 513 let response = 525 514 "outputid: 0\noutputname: default detected output\nplugin: pulse\noutputenabled: 1\nOK\n" 526 515 .to_string(); 527 516 if !ctx.batch { 528 - stream.write_all(response.as_bytes()).await?; 517 + tx.send(response.clone()).await?; 529 518 } 530 519 Ok(response) 531 520 }
+37 -69
crates/mpd/src/handlers/queue.rs
··· 1 1 use std::fs; 2 2 3 - use crate::{consts::PLAYLIST_INSERT_LAST, Context}; 3 + use crate::{consts::PLAYLIST_INSERT_LAST, handlers::Subsystem, Context}; 4 4 use anyhow::Error; 5 5 use regex::Regex; 6 6 use rockbox_rpc::api::rockbox::v1alpha1::{ 7 7 GetGlobalSettingsRequest, InsertDirectoryRequest, InsertTracksRequest, RemoveAllTracksRequest, 8 8 RemoveTracksRequest, ShufflePlaylistRequest, StartRequest, 9 9 }; 10 - use tokio::{ 11 - io::{AsyncWriteExt, BufReader}, 12 - net::TcpStream, 13 - }; 10 + use tokio::sync::mpsc::Sender; 14 11 15 12 pub async fn handle_shuffle( 16 13 ctx: &mut Context, 17 14 _request: &str, 18 - stream: &mut BufReader<TcpStream>, 15 + tx: Sender<String>, 19 16 ) -> Result<String, Error> { 20 - let mut idle = ctx.idle.lock().await; 21 - *idle = true; 22 - 23 17 ctx.playlist 24 18 .shuffle_playlist(ShufflePlaylistRequest { start_index: 0 }) 25 19 .await?; 26 20 if !ctx.batch { 27 - stream.write_all(b"OK\n").await?; 21 + tx.send("OK\n".to_string()).await?; 28 22 } 29 23 30 - match ctx.event_sender.send("playlist".to_string()) { 24 + match ctx.event_sender.send(Subsystem::Playlist) { 31 25 Ok(_) => {} 32 26 Err(_) => {} 33 27 } ··· 38 32 pub async fn handle_add( 39 33 ctx: &mut Context, 40 34 request: &str, 41 - stream: &mut BufReader<TcpStream>, 35 + tx: Sender<String>, 42 36 ) -> Result<String, Error> { 43 - let mut idle = ctx.idle.lock().await; 44 - *idle = true; 45 - 46 37 let response = ctx 47 38 .settings 48 39 .get_global_settings(GetGlobalSettingsRequest {}) ··· 55 46 let captures = re.captures(request); 56 47 if captures.is_none() { 57 48 if !ctx.batch { 58 - stream 59 - .write_all(b"ACK [2@0] {add} missing argument\n") 49 + tx.send("ACK [2@0] {add} missing argument\n".to_string()) 60 50 .await?; 61 51 } 62 52 return Ok("ACK [2@0] {add} missing argument\n".to_string()); ··· 71 61 72 62 if path.is_empty() { 73 63 if !ctx.batch { 74 - stream 75 - .write_all(b"ACK [2@0] {add} missing argument\n") 64 + tx.send("ACK [2@0] {add} missing argument\n".to_string()) 76 65 .await?; 77 66 } 78 67 return Ok("ACK [2@0] {add} missing argument\n".to_string()); ··· 86 75 // verify if path is a file or directory or doesn't exist 87 76 if fs::metadata(&path).is_err() { 88 77 if !ctx.batch { 89 - stream 90 - .write_all(b"ACK [50@0] {add} No such file or directory\n") 78 + tx.send("ACK [50@0] {add} No such file or directory\n".to_string()) 91 79 .await?; 92 80 } 93 81 return Ok("ACK [50@0] {add} No such file or directory\n".to_string()); ··· 120 108 } 121 109 122 110 if !ctx.batch { 123 - stream.write_all(b"OK\n").await?; 111 + tx.send("OK\n".to_string()).await?; 124 112 } 125 113 126 - match ctx.event_sender.send("playlist".to_string()) { 114 + match ctx.event_sender.send(Subsystem::Playlist) { 127 115 Ok(_) => {} 128 116 Err(_) => {} 129 117 } ··· 134 122 pub async fn handle_addid( 135 123 ctx: &mut Context, 136 124 request: &str, 137 - stream: &mut BufReader<TcpStream>, 125 + tx: Sender<String>, 138 126 ) -> Result<String, Error> { 139 - let mut idle = ctx.idle.lock().await; 140 - *idle = true; 141 - 142 127 let response = ctx 143 128 .settings 144 129 .get_global_settings(GetGlobalSettingsRequest {}) ··· 154 139 155 140 if captures.is_none() { 156 141 if !ctx.batch { 157 - stream 158 - .write_all(b"ACK [2@0] {add} missing argument\n") 142 + tx.send("ACK [2@0] {add} missing argument\n".to_string()) 159 143 .await?; 160 144 } 161 145 return Ok("ACK [2@0] {add} missing argument\n".to_string()); ··· 170 154 171 155 if path.is_empty() { 172 156 if !ctx.batch { 173 - stream 174 - .write_all(b"ACK [2@0] {add} missing argument\n") 157 + tx.send("ACK [2@0] {add} missing argument\n".to_string()) 175 158 .await?; 176 159 } 177 160 return Ok("ACK [2@0] {add} missing argument\n".to_string()); ··· 185 168 // verify if path is a file or directory or doesn't exist 186 169 if fs::metadata(&path).is_err() { 187 170 if !ctx.batch { 188 - stream 189 - .write_all(b"ACK [50@0] {add} No such file or directory\n") 171 + tx.send("ACK [50@0] {add} No such file or directory\n".to_string()) 190 172 .await?; 191 173 } 192 174 return Ok("ACK [50@0] {add} No such file or directory\n".to_string()); ··· 205 187 if fs::metadata(&path)?.is_dir() { 206 188 // return error if directory, invalid for addid 207 189 if !ctx.batch { 208 - stream 209 - .write_all(b"ACK [2@0] {addid} invalid argument\n") 190 + tx.send("ACK [2@0] {addid} invalid argument\n".to_string()) 210 191 .await?; 211 192 } 212 193 return Ok("ACK [2@0] {addid} invalid argument\n".to_string()); ··· 218 199 } 219 200 220 201 if !ctx.batch { 221 - stream.write_all(b"OK\n").await?; 202 + tx.send("OK\n".to_string()).await?; 222 203 } 223 204 224 - match ctx.event_sender.send("playlist".to_string()) { 205 + match ctx.event_sender.send(Subsystem::Playlist) { 225 206 Ok(_) => {} 226 207 Err(_) => {} 227 208 } ··· 232 213 pub async fn handle_playlistinfo( 233 214 ctx: &mut Context, 234 215 _request: &str, 235 - stream: &mut BufReader<TcpStream>, 216 + tx: Sender<String>, 236 217 ) -> Result<String, Error> { 237 218 let current_playlist = ctx.current_playlist.lock().await; 238 219 239 220 if current_playlist.is_none() { 240 221 if !ctx.batch { 241 - stream.write_all(b"OK\n").await?; 222 + tx.send("OK\n".to_string()).await?; 242 223 } 243 224 return Ok("OK\n".to_string()); 244 225 } ··· 270 251 let response = format!("{}OK\n", response); 271 252 272 253 if !ctx.batch { 273 - stream.write_all(response.as_bytes()).await?; 254 + tx.send(response.clone()).await?; 274 255 } 275 256 276 257 Ok(response) ··· 279 260 pub async fn handle_deleteid( 280 261 ctx: &mut Context, 281 262 request: &str, 282 - stream: &mut BufReader<TcpStream>, 263 + tx: Sender<String>, 283 264 ) -> Result<String, Error> { 284 - let mut idle = ctx.idle.lock().await; 285 - *idle = true; 286 - 287 265 let arg = request.split_whitespace().last(); 288 266 if arg.is_none() { 289 267 if !ctx.batch { 290 - stream 291 - .write_all(b"ACK [2@0] {deleteid} missing argument\n") 268 + tx.send("ACK [2@0] {deleteid} missing argument\n".to_string()) 292 269 .await?; 293 270 } 294 271 return Ok("ACK [2@0] {deleteid} missing argument\n".to_string()); ··· 300 277 Ok(x) => vec![x - 1], 301 278 Err(_) => { 302 279 if !ctx.batch { 303 - stream 304 - .write_all(b"ACK [2@0] {deleteid} invalid argument\n") 280 + tx.send("ACK [2@0] {deleteid} invalid argument\n".to_string()) 305 281 .await?; 306 282 } 307 283 return Ok("ACK [2@0] {deleteid} invalid argument\n".to_string()); ··· 311 287 .remove_tracks(RemoveTracksRequest { positions }) 312 288 .await?; 313 289 if !ctx.batch { 314 - stream.write_all(b"OK\n").await?; 290 + tx.send("OK\n".to_string()).await?; 315 291 } 316 292 317 - match ctx.event_sender.send("playlist".to_string()) { 293 + match ctx.event_sender.send(Subsystem::Playlist) { 318 294 Ok(_) => {} 319 295 Err(_) => {} 320 296 } ··· 324 300 pub async fn handle_delete( 325 301 ctx: &mut Context, 326 302 request: &str, 327 - stream: &mut BufReader<TcpStream>, 303 + tx: Sender<String>, 328 304 ) -> Result<String, Error> { 329 - let mut idle = ctx.idle.lock().await; 330 - *idle = true; 331 - 332 305 let arg = request.split_whitespace().last(); 333 306 if arg.is_none() { 334 307 if !ctx.batch { 335 - stream 336 - .write_all(b"ACK [2@0] {delete} missing argument\n") 308 + tx.send("ACK [2@0] {delete} missing argument\n".to_string()) 337 309 .await?; 338 310 } 339 311 return Ok("ACK [2@0] {delete} missing argument\n".to_string()); ··· 349 321 .remove_tracks(RemoveTracksRequest { positions }) 350 322 .await?; 351 323 if !ctx.batch { 352 - stream.write_all(b"OK\n").await?; 324 + tx.send("OK\n".to_string()).await?; 353 325 } 354 326 return Ok("OK\n".to_string()); 355 327 } ··· 357 329 Ok(x) => vec![x], 358 330 Err(_) => { 359 331 if !ctx.batch { 360 - stream 361 - .write_all(b"ACK [2@0] {delete} invalid argument\n") 332 + tx.send("ACK [2@0] {delete} invalid argument\n".to_string()) 362 333 .await?; 363 334 } 364 335 return Ok("ACK [2@0] {delete} invalid argument\n".to_string()); ··· 368 339 .remove_tracks(RemoveTracksRequest { positions }) 369 340 .await?; 370 341 if !ctx.batch { 371 - stream.write_all(b"OK\n").await?; 342 + tx.send("OK\n".to_string()).await?; 372 343 } 373 344 374 - match ctx.event_sender.send("playlist".to_string()) { 345 + match ctx.event_sender.send(Subsystem::Playlist) { 375 346 Ok(_) => {} 376 347 Err(_) => {} 377 348 } ··· 382 353 pub async fn handle_clear( 383 354 ctx: &mut Context, 384 355 _request: &str, 385 - stream: &mut BufReader<TcpStream>, 356 + tx: Sender<String>, 386 357 ) -> Result<String, Error> { 387 - let mut idle = ctx.idle.lock().await; 388 - *idle = true; 389 - 390 358 ctx.playlist 391 359 .remove_all_tracks(RemoveAllTracksRequest {}) 392 360 .await?; 393 361 if !ctx.batch { 394 - stream.write_all(b"OK\n").await?; 362 + tx.send("OK\n".to_string()).await?; 395 363 } 396 364 397 - match ctx.event_sender.send("playlist".to_string()) { 365 + match ctx.event_sender.send(Subsystem::Playlist) { 398 366 Ok(_) => {} 399 367 Err(_) => {} 400 368 } ··· 405 373 pub async fn handle_move( 406 374 ctx: &mut Context, 407 375 request: &str, 408 - stream: &mut BufReader<TcpStream>, 376 + tx: Sender<String>, 409 377 ) -> Result<String, Error> { 410 378 println!("{}", request); 411 379 if !ctx.batch { 412 - stream.write_all(b"OK\n").await?; 380 + tx.send("OK\n".to_string()).await?; 413 381 } 414 382 Ok("OK\n".to_string()) 415 383 }
+27 -26
crates/mpd/src/handlers/system.rs
··· 1 + use std::os::unix::thread; 2 + 1 3 use anyhow::Error; 2 - use tokio::{ 3 - io::{AsyncWriteExt, BufReader}, 4 - net::TcpStream, 5 - }; 4 + use tokio::sync::mpsc::Sender; 6 5 7 6 use crate::{ 8 7 consts::{COMMANDS, DECODERS}, 9 8 Context, 10 9 }; 11 10 11 + use super::Subsystem; 12 + 12 13 pub async fn handle_idle( 13 - _ctx: &mut Context, 14 + ctx: &mut Context, 14 15 _request: &str, 15 - _stream: &mut BufReader<TcpStream>, 16 + tx: Sender<String>, 16 17 ) -> Result<String, Error> { 17 - // TODO: Implement idle 18 - /* 19 - let idle = ctx.idle.lock().await; 18 + let receiver = ctx.event_receiver.clone(); 20 19 21 - if *idle { 22 - stream 23 - .write_all(b"changed: player\nchanged: playlist\nOK\n") 24 - .await?; 25 - return Ok("changed: player\nchanged: playlist\nOK\n".to_string()); 20 + tokio::spawn(async move { 21 + let mut rx = receiver.lock().await; 22 + while let Ok(event) = rx.recv().await { 23 + if event == Subsystem::NoIdle { 24 + break; 25 + } 26 + tx.send(format!("changed: {}\n", event.to_string())).await?; 26 27 } 27 - */ 28 + Ok::<(), Error>(()) 29 + }); 30 + 28 31 Ok("".to_string()) 29 32 } 30 33 31 34 pub async fn handle_noidle( 32 35 ctx: &mut Context, 33 36 _request: &str, 34 - stream: &mut BufReader<TcpStream>, 37 + tx: Sender<String>, 35 38 ) -> Result<String, Error> { 36 - ctx.idle_state.send(false)?; 37 - let mut idle = ctx.idle.lock().await; 38 - *idle = false; 39 + ctx.event_sender.send(Subsystem::NoIdle)?; 39 40 40 41 let response = "OK\n".to_string(); 41 42 if !ctx.batch { 42 - stream.write_all(response.as_bytes()).await?; 43 + tx.send(response.clone()).await?; 43 44 } 44 45 Ok(response) 45 46 } ··· 47 48 pub async fn handle_decoders( 48 49 ctx: &mut Context, 49 50 _request: &str, 50 - stream: &mut BufReader<TcpStream>, 51 + tx: Sender<String>, 51 52 ) -> Result<String, Error> { 52 53 if !ctx.batch { 53 - stream.write_all(DECODERS.as_bytes()).await?; 54 + tx.send(DECODERS.to_string()).await?; 54 55 } 55 56 Ok(DECODERS.to_string()) 56 57 } ··· 58 59 pub async fn handle_commands( 59 60 ctx: &mut Context, 60 61 _request: &str, 61 - stream: &mut BufReader<TcpStream>, 62 + tx: Sender<String>, 62 63 ) -> Result<String, Error> { 63 64 if !ctx.batch { 64 - stream.write_all(COMMANDS.as_bytes()).await?; 65 + tx.send(COMMANDS.to_string()).await?; 65 66 } 66 67 Ok(COMMANDS.to_string()) 67 68 } ··· 69 70 pub async fn handle_binarylimit( 70 71 ctx: &mut Context, 71 72 _request: &str, 72 - stream: &mut BufReader<TcpStream>, 73 + tx: Sender<String>, 73 74 ) -> Result<String, Error> { 74 75 if !ctx.batch { 75 - stream.write_all("OK\n".as_bytes()).await?; 76 + tx.send("OK\n".to_string()).await?; 76 77 } 77 78 Ok("OK\n".to_string()) 78 79 }
+93 -87
crates/mpd/src/lib.rs
··· 17 17 handle_playlistinfo, handle_shuffle, 18 18 }, 19 19 system::{handle_binarylimit, handle_commands, handle_decoders, handle_idle, handle_noidle}, 20 + Subsystem, 20 21 }; 21 22 use kv::{build_tracks_kv, KV}; 22 23 use rockbox_graphql::{ ··· 36 37 use tokio::{ 37 38 io::{AsyncReadExt, AsyncWriteExt}, 38 39 net::{TcpListener, TcpStream}, 39 - sync::{broadcast, watch, Mutex}, 40 + sync::{broadcast, Mutex}, 40 41 }; 41 42 use tokio_stream::StreamExt; 42 43 use tonic::transport::Channel; ··· 56 57 pub system: SystemServiceClient<Channel>, 57 58 pub single: Arc<Mutex<String>>, 58 59 pub batch: bool, 59 - pub idle_state: Arc<watch::Sender<bool>>, 60 - pub idle_cancel: watch::Receiver<bool>, 61 - pub event_sender: broadcast::Sender<String>, 60 + pub event_sender: broadcast::Sender<Subsystem>, 61 + pub event_receiver: Arc<Mutex<broadcast::Receiver<Subsystem>>>, 62 62 pub current_track: Arc<Mutex<Option<Track>>>, 63 63 pub current_playlist: Arc<Mutex<Option<Playlist>>>, 64 64 pub playback_status: Arc<Mutex<Option<AudioStatus>>>, 65 65 pub pool: Pool<Sqlite>, 66 66 pub kv: Arc<Mutex<KV<entity::track::Track>>>, 67 67 pub current_settings: Arc<Mutex<UserSettings>>, 68 - pub idle: Arc<Mutex<bool>>, 69 68 } 69 + 70 70 pub struct MpdServer {} 71 71 72 72 impl MpdServer { ··· 100 100 101 101 pub async fn handle_client(mut ctx: Context, stream: TcpStream) -> Result<(), Error> { 102 102 let mut buf = [0; 4096]; 103 - let mut stream = tokio::io::BufReader::new(stream); 104 - stream.write_all(b"OK MPD 0.23.15\n").await?; 103 + let (reader_stream, writer_stream) = tokio::io::split(stream); 104 + let mut reader = tokio::io::BufReader::new(reader_stream); 105 + let mut writer = tokio::io::BufWriter::new(writer_stream); 106 + 107 + let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(32); 108 + 109 + tokio::spawn(async move { 110 + while let Some(msg) = rx.recv().await { 111 + writer.write_all(msg.as_bytes()).await?; 112 + writer.flush().await?; 113 + } 114 + Ok::<(), Error>(()) 115 + }); 105 116 106 - while let Ok(n) = stream.read(&mut buf).await { 117 + tx.send("OK MPD 0.23.15\n".to_string()).await?; 118 + 119 + while let Ok(n) = reader.read(&mut buf).await { 107 120 if n == 0 { 108 121 break; 109 122 } ··· 112 125 println!("request: {}", request); 113 126 114 127 match command.as_str() { 115 - "play" => handle_play(&mut ctx, &request, &mut stream).await?, 116 - "pause" => handle_pause(&mut ctx, &request, &mut stream).await?, 117 - "toggle" => handle_toggle(&mut ctx, &request, &mut stream).await?, 118 - "next" => handle_next(&mut ctx, &request, &mut stream).await?, 119 - "previous" => handle_previous(&mut ctx, &request, &mut stream).await?, 120 - "playid" => handle_playid(&mut ctx, &request, &mut stream).await?, 121 - "seek" => handle_seek(&mut ctx, &request, &mut stream).await?, 122 - "seekid" => handle_seekid(&mut ctx, &request, &mut stream).await?, 123 - "seekcur" => handle_seekcur(&mut ctx, &request, &mut stream).await?, 124 - "random" => handle_random(&mut ctx, &request, &mut stream).await?, 125 - "repeat" => handle_repeat(&mut ctx, &request, &mut stream).await?, 126 - "getvol" => handle_getvol(&mut ctx, &request, &mut stream).await?, 127 - "setvol" => handle_setvol(&mut ctx, &request, &mut stream).await?, 128 - "volume" => handle_setvol(&mut ctx, &request, &mut stream).await?, 129 - "single" => handle_single(&mut ctx, &request, &mut stream).await?, 130 - "shuffle" => handle_shuffle(&mut ctx, &request, &mut stream).await?, 131 - "add" => handle_add(&mut ctx, &request, &mut stream).await?, 132 - "addid" => handle_addid(&mut ctx, &request, &mut stream).await?, 133 - "deleteid" => handle_deleteid(&mut ctx, &request, &mut stream).await?, 134 - "playlistinfo" => handle_playlistinfo(&mut ctx, &request, &mut stream).await?, 135 - "delete" => handle_delete(&mut ctx, &request, &mut stream).await?, 136 - "clear" => handle_clear(&mut ctx, &request, &mut stream).await?, 137 - "move" => handle_move(&mut ctx, &request, &mut stream).await?, 138 - "list album" => handle_list_album(&mut ctx, &request, &mut stream).await?, 139 - "list albumartist" => handle_list_artist(&mut ctx, &request, &mut stream).await?, 140 - "list artist" => handle_list_artist(&mut ctx, &request, &mut stream).await?, 141 - "list title" => handle_list_title(&mut ctx, &request, &mut stream).await?, 142 - "update" => handle_rescan(&mut ctx, &request, &mut stream).await?, 143 - "search" => handle_search(&mut ctx, &request, &mut stream).await?, 144 - "rescan" => handle_rescan(&mut ctx, &request, &mut stream).await?, 145 - "status" => handle_status(&mut ctx, &request, &mut stream).await?, 146 - "currentsong" => handle_currentsong(&mut ctx, &request, &mut stream).await?, 147 - "config" => handle_config(&mut ctx, &request, &mut stream).await?, 148 - "tagtypes " => handle_tagtypes(&mut ctx, &request, &mut stream).await?, 149 - "tagtypes clear" => handle_clear(&mut ctx, &request, &mut stream).await?, 150 - "tagtypes enable" => handle_tagtypes_enable(&mut ctx, &request, &mut stream).await?, 151 - "stats" => handle_stats(&mut ctx, &request, &mut stream).await?, 152 - "plchanges" => handle_playlistinfo(&mut ctx, &request, &mut stream).await?, 153 - "outputs" => handle_outputs(&mut ctx, &request, &mut stream).await?, 154 - "idle" => handle_idle(&mut ctx, &request, &mut stream).await?, 155 - "noidle" => handle_noidle(&mut ctx, &request, &mut stream).await?, 156 - "decoders" => handle_decoders(&mut ctx, &request, &mut stream).await?, 157 - "lsinfo" => handle_lsinfo(&mut ctx, &request, &mut stream).await?, 158 - "listall" => handle_listall(&mut ctx, &request, &mut stream).await?, 159 - "listallinfo" => handle_listallinfo(&mut ctx, &request, &mut stream).await?, 160 - "listfiles" => handle_listfiles(&mut ctx, &request, &mut stream).await?, 161 - "find artist" => handle_find_artist(&mut ctx, &request, &mut stream).await?, 162 - "find album" => handle_find_album(&mut ctx, &request, &mut stream).await?, 163 - "find title" => handle_find_title(&mut ctx, &request, &mut stream).await?, 164 - "binarylimit" => handle_binarylimit(&mut ctx, &request, &mut stream).await?, 165 - "commands" => handle_commands(&mut ctx, &request, &mut stream).await?, 128 + "play" => handle_play(&mut ctx, &request, tx.clone()).await?, 129 + "pause" => handle_pause(&mut ctx, &request, tx.clone()).await?, 130 + "toggle" => handle_toggle(&mut ctx, &request, tx.clone()).await?, 131 + "next" => handle_next(&mut ctx, &request, tx.clone()).await?, 132 + "previous" => handle_previous(&mut ctx, &request, tx.clone()).await?, 133 + "playid" => handle_playid(&mut ctx, &request, tx.clone()).await?, 134 + "seek" => handle_seek(&mut ctx, &request, tx.clone()).await?, 135 + "seekid" => handle_seekid(&mut ctx, &request, tx.clone()).await?, 136 + "seekcur" => handle_seekcur(&mut ctx, &request, tx.clone()).await?, 137 + "random" => handle_random(&mut ctx, &request, tx.clone()).await?, 138 + "repeat" => handle_repeat(&mut ctx, &request, tx.clone()).await?, 139 + "getvol" => handle_getvol(&mut ctx, &request, tx.clone()).await?, 140 + "setvol" => handle_setvol(&mut ctx, &request, tx.clone()).await?, 141 + "volume" => handle_setvol(&mut ctx, &request, tx.clone()).await?, 142 + "single" => handle_single(&mut ctx, &request, tx.clone()).await?, 143 + "shuffle" => handle_shuffle(&mut ctx, &request, tx.clone()).await?, 144 + "add" => handle_add(&mut ctx, &request, tx.clone()).await?, 145 + "addid" => handle_addid(&mut ctx, &request, tx.clone()).await?, 146 + "deleteid" => handle_deleteid(&mut ctx, &request, tx.clone()).await?, 147 + "playlistinfo" => handle_playlistinfo(&mut ctx, &request, tx.clone()).await?, 148 + "delete" => handle_delete(&mut ctx, &request, tx.clone()).await?, 149 + "clear" => handle_clear(&mut ctx, &request, tx.clone()).await?, 150 + "move" => handle_move(&mut ctx, &request, tx.clone()).await?, 151 + "list album" => handle_list_album(&mut ctx, &request, tx.clone()).await?, 152 + "list albumartist" => handle_list_artist(&mut ctx, &request, tx.clone()).await?, 153 + "list artist" => handle_list_artist(&mut ctx, &request, tx.clone()).await?, 154 + "list title" => handle_list_title(&mut ctx, &request, tx.clone()).await?, 155 + "update" => handle_rescan(&mut ctx, &request, tx.clone()).await?, 156 + "search" => handle_search(&mut ctx, &request, tx.clone()).await?, 157 + "rescan" => handle_rescan(&mut ctx, &request, tx.clone()).await?, 158 + "status" => handle_status(&mut ctx, &request, tx.clone()).await?, 159 + "currentsong" => handle_currentsong(&mut ctx, &request, tx.clone()).await?, 160 + "config" => handle_config(&mut ctx, &request, tx.clone()).await?, 161 + "tagtypes " => handle_tagtypes(&mut ctx, &request, tx.clone()).await?, 162 + "tagtypes clear" => handle_clear(&mut ctx, &request, tx.clone()).await?, 163 + "tagtypes enable" => handle_tagtypes_enable(&mut ctx, &request, tx.clone()).await?, 164 + "stats" => handle_stats(&mut ctx, &request, tx.clone()).await?, 165 + "plchanges" => handle_playlistinfo(&mut ctx, &request, tx.clone()).await?, 166 + "outputs" => handle_outputs(&mut ctx, &request, tx.clone()).await?, 167 + "idle" => handle_idle(&mut ctx, &request, tx.clone()).await?, 168 + "noidle" => handle_noidle(&mut ctx, &request, tx.clone()).await?, 169 + "decoders" => handle_decoders(&mut ctx, &request, tx.clone()).await?, 170 + "lsinfo" => handle_lsinfo(&mut ctx, &request, tx.clone()).await?, 171 + "listall" => handle_listall(&mut ctx, &request, tx.clone()).await?, 172 + "listallinfo" => handle_listallinfo(&mut ctx, &request, tx.clone()).await?, 173 + "listfiles" => handle_listfiles(&mut ctx, &request, tx.clone()).await?, 174 + "find artist" => handle_find_artist(&mut ctx, &request, tx.clone()).await?, 175 + "find album" => handle_find_album(&mut ctx, &request, tx.clone()).await?, 176 + "find title" => handle_find_title(&mut ctx, &request, tx.clone()).await?, 177 + "binarylimit" => handle_binarylimit(&mut ctx, &request, tx.clone()).await?, 178 + "commands" => handle_commands(&mut ctx, &request, tx.clone()).await?, 166 179 "command_list_begin" => { 167 - handle_command_list_begin(&mut ctx, &request, &mut stream).await? 180 + handle_command_list_begin(&mut ctx, &request, tx.clone()).await? 168 181 } 169 182 "command_list_ok_begin" => { 170 - handle_command_list_ok_begin(&mut ctx, &request, &mut stream).await? 183 + handle_command_list_ok_begin(&mut ctx, &request, tx.clone()).await? 171 184 } 172 185 _ => { 173 186 if command.starts_with("find ") { 174 - handle_find(&mut ctx, &request, &mut stream).await?; 187 + handle_find(&mut ctx, &request, tx.clone()).await?; 175 188 return Ok(()); 176 189 } 177 190 println!("Unhandled command: {}", command); 178 191 println!("Unhandled request: {}", request); 179 - stream 180 - .write_all(b"ACK [5@0] {unhandled} unknown command\n") 192 + tx.send("ACK [5@0] {unhandled} unknown command\n".to_string()) 181 193 .await?; 182 194 "ACK [5@0] {unhandled} unknown command\n".to_string() 183 195 } ··· 223 235 let playlist = PlaylistServiceClient::connect(url.clone()).await?; 224 236 let system = SystemServiceClient::connect(url.clone()).await?; 225 237 226 - let (event_sender, _) = broadcast::channel(16); 227 - let (idle_state, idle_cancel) = watch::channel(false); 238 + let (event_sender, event_receiver) = broadcast::channel(16); 228 239 229 240 Ok(Context { 230 241 library, ··· 235 246 system, 236 247 single: Arc::new(Mutex::new("\"0\"".to_string())), 237 248 batch, 238 - idle_state: match ctx { 239 - Some(ref ctx) => ctx.clone().idle_state, 240 - None => Arc::new(idle_state), 241 - }, 242 - idle_cancel: match ctx { 243 - Some(ref ctx) => ctx.clone().idle_cancel, 244 - None => idle_cancel, 245 - }, 246 249 event_sender: match ctx { 247 250 Some(ref ctx) => ctx.clone().event_sender, 248 251 None => event_sender, 252 + }, 253 + event_receiver: match ctx { 254 + Some(ref ctx) => ctx.clone().event_receiver, 255 + None => Arc::new(Mutex::new(event_receiver)), 249 256 }, 250 257 current_track: match ctx { 251 258 Some(ref ctx) => ctx.clone().current_track, ··· 262 269 pool, 263 270 kv, 264 271 current_settings: Arc::new(Mutex::new(rockbox_sys::settings::get_global_settings())), 265 - idle: Arc::new(Mutex::new(false)), 266 272 }) 267 273 } 268 274 ··· 304 310 || current_playlist.is_none() 305 311 { 306 312 let ctx = ctx_clone.clone(); 307 - thread::spawn(move || { 308 - thread::sleep(std::time::Duration::from_millis(500)); 309 - let rt = tokio::runtime::Runtime::new().unwrap(); 310 - let mut idle = rt.block_on(ctx.idle.lock()); 311 - *idle = true; 312 - }); 313 + match ctx.event_sender.send(Subsystem::Playlist) { 314 + Ok(_) => {} 315 + Err(e) => { 316 + eprintln!("Error: {}", e) 317 + } 318 + } 313 319 } 314 320 315 321 *current_playlist = Some(playlist); ··· 327 333 && playback_status.as_ref().unwrap().status != status.status 328 334 { 329 335 let ctx = another_ctx.clone(); 330 - thread::spawn(move || { 331 - thread::sleep(std::time::Duration::from_millis(500)); 332 - let rt = tokio::runtime::Runtime::new().unwrap(); 333 - let mut idle = rt.block_on(ctx.idle.lock()); 334 - *idle = true; 335 - }); 336 + match ctx.event_sender.send(Subsystem::Player) { 337 + Ok(_) => {} 338 + Err(e) => { 339 + eprintln!("Error: {}", e) 340 + } 341 + } 336 342 } 337 343 *playback_status = Some(status); 338 344 }