···6262 args.push(service);
6363 }
64646565- let mut stream = self.cli(&project, args)?;
6565+ let mut stream = self.run_pty(&project, args)?;
6666 let sender = self.create_log_sender(project.clone()).await;
67676868 while let Some(bytes) = stream.next().await {
+51-47
packages/node/src/core/engine.rs
···11//! The core engine of the Luminary application, containing shared state and configuration.
2233-use std::{collections::HashMap, path::Path, process::Stdio, sync::Arc};
33+use std::{collections::HashMap, io::Read, path::Path, sync::Arc};
4455use async_stream::stream;
66use bollard::Docker;
77use bytes::Bytes;
88use eyre::{Context, Result, bail};
99use futures_util::{StreamExt, stream::BoxStream};
1010+use log::error;
1011use luminary_macros::wrap_err;
1111-use tokio::{
1212- io::AsyncReadExt,
1313- process::Command,
1414- sync::{Mutex, RwLock, broadcast},
1515-};
1212+use portable_pty::{CommandBuilder, PtySize, native_pty_system};
1313+use tokio::sync::{Mutex, RwLock, broadcast, mpsc};
16141715use crate::{
1816 configuration::LuminaryConfiguration,
1917 core::{LuminaryProjectList, ProjectLogChannel},
1818+};
1919+2020+const PTY_SIZE: PtySize = PtySize {
2121+ rows: 40,
2222+ cols: 80,
2323+ pixel_width: 18,
2424+ pixel_height: 18,
2025};
21262227/// The core engine of the Luminary application, containing shared state and configuration.
···6671 }
6772 }
68736969- /// Spawns a `docker compose` process and returns a stream of raw bytes merging both stdout and stderr
7070- pub(super) fn cli<'a>(
7474+ /// Spawns a `docker compose` process and returns a stream of raw bytes from its output.
7575+ pub(super) fn run_pty<'a>(
7176 &self,
7277 name: &'a str,
7378 args: impl IntoIterator<Item = &'a str>,
···7883 bail!("Project '{}' does not exist", name);
7984 }
80858181- let mut child = Command::new("docker")
8282- .current_dir(path)
8383- .arg("compose")
8484- .args(["--ansi", "always"])
8585- .args(args)
8686- .stdout(Stdio::piped())
8787- .stderr(Stdio::piped())
8888- .spawn()
8989- .wrap_err("Failed to spawn docker compose process")?;
8686+ let pty = native_pty_system()
8787+ .openpty(PTY_SIZE)
8888+ .map_err(|e| eyre::eyre!("Failed to open PTY: {e}"))?;
90899191- let mut stdout = child.stdout.take().expect("stdout was piped");
9292- let mut stderr = child.stderr.take().expect("stderr was piped");
9090+ let mut command = CommandBuilder::new("docker");
9191+ command.cwd(path);
9292+ command.arg("compose");
9393+ command.args(args);
93949494- Ok(stream! {
9595- let mut stdout_buf = vec![0u8; 4096];
9696- let mut stderr_buf = vec![0u8; 4096];
9797- let mut stdout_done = false;
9898- let mut stderr_done = false;
9595+ let mut child = pty
9696+ .slave
9797+ .spawn_command(command)
9898+ .map_err(|err| eyre::eyre!("Failed to spawn docker compose process: {}", err))?;
9999100100- while !stdout_done || !stderr_done {
101101- tokio::select! {
102102- result = stdout.read(&mut stdout_buf), if !stdout_done => {
103103- match result {
104104- Ok(0) => stdout_done = true,
105105- Ok(n) => yield Ok(Bytes::copy_from_slice(&stdout_buf[..n])),
106106- Err(e) => {
107107- yield Err(eyre::eyre!(e).wrap_err("Failed to read stdout"));
108108- stdout_done = true;
109109- }
110110- }
111111- }
112112- result = stderr.read(&mut stderr_buf), if !stderr_done => {
113113- match result {
114114- Ok(0) => stderr_done = true,
115115- Ok(n) => yield Ok(Bytes::copy_from_slice(&stderr_buf[..n])),
116116- Err(e) => {
117117- yield Err(eyre::eyre!(e).wrap_err("Failed to read stderr"));
118118- stderr_done = true;
119119- }
120120- }
100100+ let mut reader = pty
101101+ .master
102102+ .try_clone_reader()
103103+ .map_err(|err| eyre::eyre!("Failed to create PTY output reader: {}", err))?;
104104+105105+ // Close the slave so reader recieves EOF
106106+ drop(pty.slave);
107107+108108+ let (sender, mut reciever) = mpsc::channel::<Result<Bytes>>(64);
109109+110110+ tokio::task::spawn_blocking(move || {
111111+ let mut buf = vec![0u8; 4096];
112112+113113+ loop {
114114+ match reader.read(&mut buf) {
115115+ Err(err) => error!("Error reading from PTY: {}", err),
116116+ Ok(0) => break,
117117+ Ok(n) => {
118118+ sender.blocking_send(Ok(Bytes::copy_from_slice(&buf[..n]))).ok();
121119 }
122120 }
123121 }
124122125125- child.wait().await.ok();
123123+ child.wait().ok();
124124+ });
125125+126126+ return Ok(stream! {
127127+ while let Some(chunk) = reciever.recv().await {
128128+ yield chunk;
129129+ }
126130 }
127127- .boxed())
131131+ .boxed());
128132 }
129133}
+1-1
packages/node/src/core/logs.rs
···8787 debug!("Starting logs stream for project '{}'...", project);
8888 // Spawn docker compose process, yielding logs as they are recieved
8989 match this
9090- .cli(&project, ["logs", "-f"])
9090+ .run_pty(&project, ["logs", "-f"])
9191 .wrap_err("Failed to start docker compose logs process")
9292 {
9393 Err(err) => error!("{}", eyre_fmt!(err)),
+2-2
packages/node/src/core/project.rs
···150150151151 if recreate {
152152 // Use manual command as action is currently "patching"
153153- let mut stream = self.cli(&from, vec!["down", "--remove-orphans"])?;
153153+ let mut stream = self.run_pty(&from, vec!["down", "--remove-orphans"])?;
154154 while let Some(_) = stream.next().await {}
155155 };
156156···159159 .wrap_err("Failed to rename project directory")?;
160160161161 if recreate {
162162- let mut stream = self.cli(&to, vec!["up", "-d"])?;
162162+ let mut stream = self.run_pty(&to, vec!["up", "-d"])?;
163163 while let Some(_) = stream.next().await {}
164164 }
165165