A local-first private AI assistant for everyday use. Runs on-device models with encrypted P2P sync, and supports sharing chats publicly on ATProto.
10
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: Streaming from PI + refactoring

+176 -65
+6 -3
tiles/src/daemon.rs
··· 58 58 app_vsn, 59 59 daemon_current_vsn 60 60 ); 61 - if app_vsn 62 - .cmp_precedence(&Version::parse(&daemon_current_vsn)?) 63 - .is_ne() 61 + // TODO: "its me check is for older versions, where there's no concept of 62 + // vsn in state, but better way is to change the api altogether" 63 + if daemon_current_vsn.contains("Its me") 64 + || app_vsn 65 + .cmp_precedence(&Version::parse(&daemon_current_vsn)?) 66 + .is_ne() 64 67 { 65 68 log::info!( 66 69 "New app version found {}, hot reload the daemon {}",
+170 -62
tiles/src/runtime/mlx.rs
··· 19 19 use rustyline::{Config, Editor, Helper}; 20 20 use serde::{Deserialize, Serialize}; 21 21 use serde_json::{Value, json}; 22 + use std::cell::{Cell, RefCell}; 23 + use std::collections::HashMap; 22 24 use std::fs::OpenOptions; 23 - use std::io::{BufRead, BufReader, Write}; 25 + use std::io::{BufRead, BufReader, Read, Write}; 24 26 use std::path::PathBuf; 25 - use std::process::{Child, Command}; 27 + use std::process::{Child, ChildStdout, Command}; 26 28 use std::process::{ChildStdin, Stdio}; 29 + use std::rc::Rc; 27 30 use std::time::Duration; 28 31 use tilekit::modelfile::Modelfile; 29 32 use tilekit::modelfile::Role; ··· 65 68 } 66 69 67 70 #[derive(Serialize, Deserialize, Debug)] 68 - struct PiResponse { 69 - r#type: String, 70 - command: String, 71 - success: bool, 72 - data: Option<Value>, 71 + #[serde(tag = "type")] 72 + enum PiResponse { 73 + #[serde(rename = "response")] 74 + Response(PiResponseMessage), 75 + #[serde(rename = "agent_start")] 76 + AgentStart, 77 + #[serde(rename = "message_update")] 78 + MessageUpdate(PiMessageUpdate), 79 + #[serde(rename = "agent_end")] 80 + AgentEnd, 81 + #[serde[other]] 82 + Unknown, 73 83 } 74 84 75 - #[derive(Serialize, Deserialize)] 85 + #[derive(Serialize, Deserialize, Debug)] 76 86 struct GetStateData { 77 87 model: Value, 78 88 #[serde(rename = "thinkingLevel")] ··· 82 92 #[serde(rename = "sessionId")] 83 93 session_id: String, 84 94 } 95 + 96 + #[derive(Serialize, Deserialize, Debug)] 97 + struct PiMessageUpdate { 98 + #[serde(rename = "assistantMessageEvent")] 99 + assistant_message_event: PiAsstTextMsg, 100 + } 101 + 102 + #[derive(Serialize, Deserialize, Debug)] 103 + struct PiAsstTextMsg { 104 + r#type: String, 105 + delta: Option<String>, 106 + } 107 + 108 + #[derive(Serialize, Deserialize, Debug)] 109 + struct PiResponseMessage { 110 + command: CommandType, 111 + success: bool, 112 + data: Option<Value>, 113 + } 114 + 85 115 impl Default for MLXRuntime { 86 116 fn default() -> Self { 87 117 Self::new() ··· 201 231 202 232 impl Helper for TilesHinter {} 203 233 204 - enum SlashCommand { 205 - Continue, 234 + enum InputType { 235 + Skip, 236 + Command(String), 206 237 Exit, 207 - State, 208 - NotACommand, 238 + Prompt, 209 239 } 210 240 211 - fn handle_slash_command(input: &str, modelname: &str) -> SlashCommand { 241 + #[derive(Deserialize, Serialize, Debug)] 242 + enum CommandType { 243 + #[serde(rename = "get_state")] 244 + State, 245 + #[serde(other)] 246 + Unknown, 247 + } 248 + fn handle_input(input: &str, modelname: &str) -> InputType { 212 249 if let Some(cmd) = input.strip_prefix('/') { 213 250 match cmd { 214 251 "help" | "?" => { 215 252 show_help(modelname); 216 - SlashCommand::Continue 253 + InputType::Skip 217 254 } 218 - "state" => SlashCommand::State, 219 - "bye" => SlashCommand::Exit, 255 + "bye" => InputType::Exit, 220 256 "" => { 221 257 println!("Empty command. Type /help for available commands."); 222 - SlashCommand::Continue 258 + InputType::Skip 223 259 } 224 - _ => { 225 - println!( 226 - "Unknown command: /{}. Type /help for available commands.", 227 - cmd 228 - ); 229 - SlashCommand::Continue 230 - } 260 + cmd => InputType::Command(cmd.to_owned()), 231 261 } 232 262 } else { 233 - SlashCommand::NotACommand 263 + InputType::Prompt 234 264 } 235 265 } 236 266 ··· 296 326 let mut pi_process = start_pi_rpc()?; 297 327 298 328 let pi_stdin = pi_process.stdin.as_mut().unwrap(); 329 + let mut stdout = pi_process.stdout.take().expect("stdout"); 330 + // let mut stdout: Cell<ChildStdout> = Cell::new(); 299 331 loop { 300 332 let readline = editor.readline(">>> "); 301 333 let input = match readline { 302 334 Ok(line) => line.trim().to_string(), 303 335 Err(_) => { 304 336 // User pressed Ctrl+C or Ctrl+D 337 + let end_payload = json!({ 338 + "type": "abort", 339 + }); 340 + let payload_str = format!("{}\n", serde_json::to_string(&end_payload)?); 341 + pi_stdin.write_all(payload_str.as_bytes())?; 342 + pi_stdin.flush()?; 305 343 println!("Exiting interactive mode"); 306 344 if !cfg!(debug_assertions) { 307 345 let _res = mlx_runtime.stop_server_daemon().await; ··· 310 348 } 311 349 }; 312 350 313 - match handle_slash_command(&input, modelname.as_str()) { 314 - SlashCommand::Continue => continue, 315 - SlashCommand::Exit => { 351 + if input.is_empty() { 352 + continue; 353 + } 354 + match handle_input(&input, modelname.as_str()) { 355 + InputType::Skip => continue, 356 + InputType::Exit => { 357 + let end_payload = json!({ 358 + "type": "abort", 359 + }); 360 + let payload_str = format!("{}\n", serde_json::to_string(&end_payload)?); 361 + pi_stdin.write_all(payload_str.as_bytes())?; 362 + pi_stdin.flush()?; 316 363 println!("Exiting interactive mode"); 317 364 if !cfg!(debug_assertions) { 318 365 let _res = mlx_runtime.stop_server_daemon().await; 319 366 } 320 367 break; 321 368 } 322 - SlashCommand::State => { 323 - send_pi_command(pi_stdin, "e")?; 369 + InputType::Prompt => { 370 + let payload = json!({ 371 + "type": "prompt", 372 + "message": input 373 + }); 374 + send_to_pi(pi_stdin, payload)?; 324 375 } 325 - SlashCommand::NotACommand => {} 376 + InputType::Command(cmd) => { 377 + let cmd_json = json!(cmd); 378 + let command: CommandType = serde_json::from_value(cmd_json)?; 379 + match command { 380 + CommandType::Unknown => { 381 + println!( 382 + "Unknown command: /{}. Type /help for available commands.", 383 + cmd 384 + ); 385 + continue; 386 + } 387 + cmd_type => { 388 + let payload = get_command_payload(cmd_type); 389 + send_to_pi(pi_stdin, payload) 390 + .inspect_err(|_e| eprintln!("send pi failed"))?; 391 + } 392 + } 393 + } 326 394 } 327 395 328 - if input.is_empty() { 329 - continue; 330 - } 331 396 let mut remaining_count = run_args.relay_count; 332 397 let mut python_code: String = "".to_owned(); 333 398 let mut bench_metrics: BenchmarkMetrics = BenchmarkMetrics { ··· 336 401 tokens_per_second: 0.0, 337 402 total_latency_s: 0.0, 338 403 }; 404 + let mut is_agent_streaming: bool = false; 405 + let reader = BufReader::new(&mut stdout); 339 406 340 - let stdout = pi_process.stdout.take().expect("stdout"); 341 - let reader = BufReader::new(stdout); 342 407 for line in reader.lines() { 343 - let line = line.unwrap(); 344 - if let Some(pi_response) = serde_json::from_str::<PiResponse>(&line)?.into() { 345 - if pi_response.command == "get_state" && pi_response.success { 346 - let data: GetStateData = serde_json::from_value(pi_response.data.unwrap())?; 347 - let render = format!( 348 - "Model: {}\n,thinking: {}\n, session_id: {}", 349 - data.model.get("name").unwrap(), 350 - data.thinking_level, 351 - data.session_id 352 - ); 353 - println!("{}", render); 354 - } else { 355 - println!("got line {}", line); 408 + //TODO: handle the unwrap 409 + let line = line?; 410 + let response: PiResponse = serde_json::from_str(&line)?; 411 + 412 + match response { 413 + PiResponse::AgentStart => { 414 + // agent streaming started 415 + is_agent_streaming = true 356 416 } 357 - break; 358 - } else { 359 - break; 417 + PiResponse::MessageUpdate(msg_update) => { 418 + if msg_update.assistant_message_event.r#type == "text_delta" 419 + && msg_update.assistant_message_event.delta.is_some() 420 + { 421 + print!("{}", msg_update.assistant_message_event.delta.unwrap()); 422 + // TODO: maybe can optimize check print! doc 423 + use std::io::Write; 424 + std::io::stdout().flush().ok(); 425 + } 426 + } 427 + PiResponse::AgentEnd => { 428 + // agent streaming stopeed 429 + is_agent_streaming = false; 430 + break; 431 + } 432 + PiResponse::Response(response_msg) => { 433 + if response_msg.success { 434 + match response_msg.command { 435 + CommandType::Unknown => { 436 + continue; 437 + } 438 + cmd => process_command(cmd, response_msg.data)?, 439 + } 440 + } else { 441 + println!("Command failed") 442 + } 443 + break; 444 + } 445 + PiResponse::Unknown => { 446 + // Not handling now 447 + } 360 448 } 361 449 } 362 450 // loop { ··· 775 863 Ok(pi_process) 776 864 } 777 865 778 - fn read_events(process: Child) { 779 - for line in process.stdout.iter() {} 780 - } 781 - 782 - fn send_pi_command(pi_child_stdin: &mut ChildStdin, command: &str) -> Result<()> { 783 - let payload = json!({ 784 - "type": "get_state" 785 - }); 786 - 787 - let payload_str = format!("{}\n", serde_json::to_string(&payload)?); 866 + fn send_to_pi(pi_child_stdin: &mut ChildStdin, payload_json: Value) -> Result<()> { 867 + let payload_str = format!("{}\n", serde_json::to_string(&payload_json)?); 788 868 789 869 pi_child_stdin.write_all(payload_str.as_bytes()).unwrap(); 790 870 pi_child_stdin.flush()?; 791 871 Ok(()) 792 872 } 873 + 874 + fn get_command_payload(cmd: CommandType) -> Value { 875 + match cmd { 876 + CommandType::Unknown => { 877 + json!({ 878 + "type": "none" 879 + }) 880 + } 881 + CommandType::State => { 882 + json!({ 883 + "type": "get_state", 884 + }) 885 + } 886 + } 887 + } 888 + 889 + fn process_command(cmd: CommandType, data: Option<Value>) -> Result<()> { 890 + match cmd { 891 + CommandType::Unknown => (), 892 + CommandType::State => { 893 + let state: GetStateData = serde_json::from_value(data.unwrap())?; 894 + println!("{:?}", state); 895 + use std::io::Write; 896 + std::io::stdout().flush().ok(); 897 + } 898 + } 899 + Ok(()) 900 + }