WIP push-to-talk Letta chat frontend
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

clean up STT implementation

+151 -38
+148 -34
src-tauri/src/cartesia/stt.rs
··· 6 6 stream::{SplitSink, SplitStream}, 7 7 SinkExt, StreamExt, 8 8 }; 9 + use serde::Deserialize; 9 10 use std::sync::Arc; 10 - use tauri::async_runtime::Mutex; 11 + use tauri::async_runtime::{Mutex, RwLock}; 11 12 use tokio::net::TcpStream; 12 13 use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; 13 - use tungstenite::{Bytes, Message}; 14 + use tungstenite::{Bytes, Error, Message, Utf8Bytes}; 15 + 16 + #[derive(Debug)] 17 + enum SttStatus { 18 + Idle, 19 + Opening, 20 + Connected, 21 + Flushing, 22 + Done, 23 + Error, 24 + } 14 25 15 26 pub struct SttManager { 27 + status: Arc<RwLock<SttStatus>>, 16 28 client: Arc<CartesiaClient>, 17 29 input: Arc<InputDeviceManager>, 18 - is_transcribing: Arc<Mutex<bool>>, 30 + } 31 + 32 + #[derive(Deserialize, Debug)] 33 + struct TranscriptionWord { 34 + word: String, 35 + start: f32, 36 + end: f32, 37 + } 38 + 39 + #[derive(Deserialize, Debug)] 40 + #[serde(untagged)] 41 + enum TranscriptionMessage { 42 + Transcript { 43 + #[serde(rename = "type")] 44 + message_type: String, 45 + request_id: String, 46 + is_final: bool, 47 + text: String, 48 + duration: Option<f32>, 49 + language: Option<String>, 50 + words: Vec<TranscriptionWord>, 51 + }, 52 + Error { 53 + #[serde(rename = "type")] 54 + message_type: String, 55 + message: String, 56 + request_id: Option<String>, 57 + }, 58 + Done { 59 + #[serde(rename = "type")] 60 + message_type: String, 61 + request_id: String, 62 + }, 19 63 } 20 64 21 65 impl SttManager { 22 66 pub fn new(client: Arc<CartesiaClient>, input: Arc<InputDeviceManager>) -> Self { 23 67 Self { 68 + status: Arc::new(RwLock::new(SttStatus::Idle)), 24 69 client, 25 70 input, 26 - is_transcribing: Arc::new(Mutex::new(false)), 27 71 } 28 72 } 29 73 ··· 32 76 mut reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, 33 77 ) { 34 78 println!("handling messages"); 79 + 35 80 while let Some(message) = reader.next().await { 36 - let is_transcribing = self.is_transcribing.lock().await; 81 + match message { 82 + Ok(Message::Text(msg)) => { 83 + println!("got message: {}", msg); 37 84 38 - if *is_transcribing { 39 - println!("message from STT: {:?}", message) 40 - } else { 41 - println!("finished transcribing; breaking message handling"); 42 - break; 85 + match serde_json::from_str::<TranscriptionMessage>(msg.as_str()) { 86 + Ok(TranscriptionMessage::Transcript { 87 + message_type, 88 + request_id, 89 + is_final, 90 + text, 91 + duration, 92 + language, 93 + words, 94 + }) => { 95 + println!("transcribed: {:?}", text); 96 + } 97 + Ok(TranscriptionMessage::Done { 98 + message_type, 99 + request_id, 100 + }) => { 101 + println!("recieved done message, stopping"); 102 + self.stop_transcription().await; 103 + break; 104 + } 105 + Ok(msg) => { 106 + println!("received a message: {:?}", msg) 107 + } 108 + Err(err) => { 109 + eprintln!("failed to parse message: {}", err); 110 + self.stop_transcription().await; 111 + break; 112 + } 113 + } 114 + } 115 + Ok(msg) => { 116 + println!("got non-text message: {}", msg); 117 + } 118 + Err(Error::ConnectionClosed) | Err(Error::AlreadyClosed) => { 119 + eprintln!("connection closed"); 120 + break; 121 + } 122 + Err(err) => { 123 + eprintln!("err with connection: {}", err); 124 + break; 125 + } 43 126 } 44 127 } 45 128 } ··· 47 130 async fn send_frames( 48 131 &self, 49 132 signal: impl Signal<Frame = i16>, 50 - mut writer: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>, 133 + writer: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>, 51 134 ) { 135 + let mut w = writer.lock().await; 52 136 let mut buffer = Vec::with_capacity(3200); 53 - println!("sending frames"); 137 + 54 138 for frame in signal.until_exhausted() { 55 - let is_transcribing = self.is_transcribing.lock().await; 139 + let status = self.status.read().await; 56 140 57 - if *is_transcribing { 58 - buffer.push(frame.to_le_bytes()); 141 + match *status { 142 + SttStatus::Connected => { 143 + buffer.push(frame.to_le_bytes()); 59 144 60 - if buffer.len() == buffer.capacity() { 61 - println!("sending {} frames", buffer.len()); 62 - writer 63 - .send(tungstenite::Message::Binary(Bytes::from_iter( 145 + if buffer.len() == buffer.capacity() { 146 + w.send(tungstenite::Message::Binary(Bytes::from_iter( 64 147 buffer.iter().flat_map(|f| *f), 65 148 ))) 66 149 .await 67 150 .expect("failed to send binary frame message to STT"); 68 151 69 - buffer.clear(); 152 + buffer.clear(); 153 + } 154 + } 155 + SttStatus::Flushing => { 156 + println!("flushing audio data"); 157 + 158 + w.send(tungstenite::Message::Binary(Bytes::from_iter( 159 + buffer.iter().flat_map(|f| *f), 160 + ))) 161 + .await 162 + .expect("failed to send binary frame message to STT"); 163 + 164 + w.send(Message::Text(Utf8Bytes::from_static("done"))) 165 + .await 166 + .expect("failed to send done message"); 167 + 168 + let _ = w.flush().await; 169 + break; 70 170 } 71 - } else { 72 - println!("stopping frame send"); 73 - break; 171 + _ => (), 74 172 } 75 173 } 76 174 } 77 175 78 176 /// Begins transcribing text via Cartesia. Blocks until `stop_transcription` is called 79 177 pub async fn transcribe(&self) { 80 - let stream = self.client.open_stt_connection().await; 178 + println!("starting transcription"); 179 + 180 + { 181 + let mut status = self.status.write().await; 182 + *status = SttStatus::Opening; 183 + } 184 + 81 185 let input = self.input.start_listening().await; 82 - let (writer, reader) = stream.split(); 186 + let stream = self.client.open_stt_connection().await; 187 + let (tx, rx) = stream.split(); 188 + let writer = Arc::new(Mutex::new(tx)); 83 189 84 190 { 85 - let mut is_transcribing = self.is_transcribing.lock().await; 86 - *is_transcribing = true; 191 + let mut status = self.status.write().await; 192 + *status = SttStatus::Connected; 87 193 } 88 194 89 195 // Handle incoming messages 90 - let read = self.handle_messages(reader); 91 - let write = self.send_frames(input, writer); 196 + let read = self.handle_messages(rx); 197 + let write = self.send_frames(input, writer.clone()); 92 198 93 199 join(read, write).await; 94 200 95 - stream.close(); 201 + writer 202 + .lock() 203 + .await 204 + .send(Message::Close(None)) 205 + .await 206 + .expect("failed to close socket"); 207 + } 208 + 209 + async fn flush_audio(&self) { 210 + let mut status = self.status.write().await; 211 + *status = SttStatus::Flushing; 96 212 } 97 213 98 214 /// Terminates the microphone signal and halts the transcription processes 99 215 pub async fn stop_transcription(&self) { 100 - { 101 - let mut is_transcribing = self.is_transcribing.lock().await; 102 - *is_transcribing = false; 103 - } 216 + println!("stopping transcription"); 104 217 218 + self.flush_audio().await; 105 219 self.input.stop_listening().await; 106 220 } 107 221 }
+3 -4
src-tauri/src/devices/input.rs
··· 38 38 } 39 39 40 40 pub async fn start_listening(&self) -> impl Signal<Frame = i16> { 41 + println!("opening device stream"); 41 42 let config = self 42 43 .device 43 44 .supported_input_configs() ··· 56 57 .chunks(channels) 57 58 .map(|frame| (frame.iter().sum::<i16>() / channels as i16).to_le()); 58 59 59 - let _ = tx 60 - .send(mono.collect()) 61 - .inspect(|_| println!("successfully sent frame")) 62 - .inspect_err(|e| eprintln!("failed to send frame: {e}")); 60 + let _ = tx.send(mono.collect()); 63 61 }, 64 62 |err| eprintln!("encountered error streaming input: {}", err), 65 63 None, ··· 78 76 } 79 77 80 78 pub async fn stop_listening(&self) { 79 + println!("dropping device stream"); 81 80 let mut s = self.stream.lock().await; 82 81 *s = None; 83 82 }