use std::{io::Cursor, sync::Arc}; use base64::prelude::{Engine as _, BASE64_STANDARD}; use tauri::async_runtime::spawn; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver}, Mutex, RwLock, }; use crate::{ cartesia::tts::{TtsContext, TtsInputMessage, TtsManager, TtsMessage}, conversation::types::{Turn, TurnMessage}, letta::{ types::{LettaCompletionMessage, LettaMessageContent}, LettaManager, }, }; mod types; pub struct ConversationManager { turn: Option>>, current_msg_index: RwLock, letta_manager: Arc, tts_manager: Arc, } impl ConversationManager { pub fn new(letta_manager: Arc, tts_manager: Arc) -> Self { Self { turn: None, current_msg_index: RwLock::new(0), letta_manager, tts_manager, } } pub fn is_idle(&self) -> bool { self.turn.is_none() } /// Start a new conversation turn pub async fn start_turn(&mut self, prompt: String) { if !self.is_idle() { return; }; let turn = Arc::new(RwLock::new(Turn::new())); { let mut idx = self.current_msg_index.write().await; *idx = 0; self.turn = Some(turn.clone()); } spawn(handle_letta_messages( prompt, self.letta_manager.clone(), self.tts_manager.clone(), turn.clone(), )); } } async fn handle_letta_messages( prompt: String, letta: Arc, tts: Arc, turn: Arc>, ) { match letta.start_completion(prompt).await { Ok(mut iter) => { while let Some(msg) = iter.recv().await { let turn = turn.read().await; match turn.latest() { Some(latest) => match latest { TurnMessage::TextMessage { id, reader: _, writer, } if id == msg.id => { // Add current message to chunks } TurnMessage::AudioMessage { id, reader: _, writer, context, cursor: _, timestamps: _, } => { // Add current message to chunks } }, None => { // Create a new message // Append message to chunks } } } } Err(err) => eprintln!("failed to start completion: {}", err), } } async fn create_turn_message_from_letta( src: LettaCompletionMessage, tts: Arc, ) -> Option { let (writer, reader) = unbounded_channel::(); writer.send(src.clone()); match src { LettaCompletionMessage::ApprovalRequestMessage { id, .. } | LettaCompletionMessage::ApprovalResponseMessage { id, .. } | LettaCompletionMessage::HiddenReasoningMessage { id, .. } | LettaCompletionMessage::SystemMessage { id, .. } | LettaCompletionMessage::ToolCallMessage { id, .. } | LettaCompletionMessage::ToolReturnMessage { id, .. } => { Some(TurnMessage::TextMessage { id, reader, writer }) } LettaCompletionMessage::AssistantMessage { id, content: blocks, .. } => { let cursor = Cursor::new(Vec::new()); let mut content = "".to_owned(); for b in blocks { match b { LettaMessageContent::Text { text } => content.push_str(&text), _ => (), } } let context = tts .new_context(id.clone()) .await .expect("failed to create new TTS context"); context.send(content, false).await; // Spawn task for handling audio generation spawn((async |reader: Arc< Mutex>, >| { // Listen to context and append to cursor while let Some(msg) = reader.lock().await.recv().await { match msg { TtsMessage::Chunk { data, .. } => { // Decode chunk and write to cursor let out = BASE64_STANDARD.decode_vec(data, cursor.get_mut()); } TtsMessage::Timestamps { word_timestamps, .. } => { // Append timestamps } _ => (), } } })(context.reader.clone())); Some(TurnMessage::AudioMessage { id: id.clone(), reader, writer, context, cursor, timestamps: Vec::new(), }) } _ => None, } }