A focused Docker Compose management web application.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: serverside log multiplexing

Brooke 1ed10e16 c47a7787

+156 -65
+15 -7
packages/node/src/api/realtime/logs.rs
··· 14 14 sse::{self, SseEvent}, 15 15 }; 16 16 17 - use crate::{core::LuminaryEngine, eyre_fmt, obtain}; 17 + use crate::{ 18 + core::{LuminaryEngine, ProjectLogChannelMessage}, 19 + eyre_fmt, obtain, 20 + }; 18 21 19 22 /// Subscribes to a stream of log messages for a given project, sent as Server-Sent Events. 20 23 #[endpoint( ··· 30 33 let engine = obtain!(depot, LuminaryEngine); 31 34 let project = project.into_inner(); 32 35 33 - let mut stream = engine.logs_subscribe(project).await; 36 + let mut stream = engine.clone().logs_subscribe(project).await; 34 37 35 38 sse::stream( 36 39 res, 37 40 async_stream::stream! { 38 - while let Some(bytes) = stream.next().await { 39 - match create_event(&bytes).wrap_err("Failed to create SSE event from log bytes") { 41 + while let Some(message) = stream.next().await { 42 + match create_event(message).wrap_err("Failed to create SSE event from log bytes") { 40 43 Err(err) => log::error!("{}", eyre_fmt!(err)), 41 44 Ok(event) => yield Ok::<SseEvent, Infallible>(event), 42 45 } ··· 46 49 } 47 50 48 51 /// Creates a Server-Sent Event from a chunk of log bytes. 49 - fn create_event(bytes: &[u8]) -> Result<SseEvent, Infallible> { 50 - let encoded = STANDARD.encode(bytes); 51 - return Ok(SseEvent::default().text(encoded)); 52 + fn create_event(message: ProjectLogChannelMessage) -> Result<SseEvent, Infallible> { 53 + return match message { 54 + ProjectLogChannelMessage::Close(uuid) => Ok(SseEvent::default().id("close").text(uuid)), 55 + ProjectLogChannelMessage::Write(uuid, bytes) => { 56 + let encoded = STANDARD.encode(bytes); 57 + Ok(SseEvent::default().id(uuid).text(encoded)) 58 + } 59 + }; 52 60 }
+17 -11
packages/node/src/core/action.rs
··· 9 9 #[wrap_err("Failed to set action for service")] 10 10 pub(super) async fn set_action( 11 11 &self, 12 - project: &str, 13 - service: Option<&str>, 12 + project: &String, 13 + service: Option<&String>, 14 14 action: LuminaryAction, 15 15 ) -> Result<()> { 16 16 // Get list of targets to update ··· 52 52 async fn run( 53 53 &self, 54 54 action: LuminaryAction, 55 - project: &str, 56 - service: Option<&str>, 55 + project: &String, 56 + service: Option<&String>, 57 57 mut args: Vec<&str>, 58 58 ) -> Result<()> { 59 59 self.set_action(project, service, action).await?; ··· 63 63 } 64 64 65 65 let mut stream = self.cli(&project, args)?; 66 - while let Some(_) = stream.next().await {} 66 + let sender = self.create_log_sender(project.clone()).await; 67 + 68 + while let Some(bytes) = stream.next().await { 69 + sender.write(bytes?).await; 70 + } 71 + 67 72 self.set_action(project, service, LuminaryAction::Idle).await?; 73 + sender.close().await; 68 74 return Ok(()); 69 75 } 70 76 71 77 /// Restarts the given project and optionally, a specific service within that project. 72 78 #[wrap_err("Failed to restart project/service")] 73 - pub async fn restart(&self, project: &str, service: Option<&str>) -> Result<()> { 79 + pub async fn restart(&self, project: &String, service: Option<&String>) -> Result<()> { 74 80 self.run(LuminaryAction::Restarting, project, service, vec!["restart"]) 75 81 .await?; 76 82 Ok(()) ··· 78 84 79 85 /// Starts the given project and optionally, a specific service within that project. 80 86 #[wrap_err("Failed to start project/service")] 81 - pub async fn start(&self, project: &str, service: Option<&str>) -> Result<()> { 87 + pub async fn start(&self, project: &String, service: Option<&String>) -> Result<()> { 82 88 self.run(LuminaryAction::Starting, project, service, vec!["up", "-d"]) 83 89 .await?; 84 90 Ok(()) ··· 86 92 87 93 /// Stops the given project and optionally, a specific service within that project. 88 94 #[wrap_err("Failed to stop project/service")] 89 - pub async fn stop(&self, project: &str, service: Option<&str>) -> Result<()> { 95 + pub async fn stop(&self, project: &String, service: Option<&String>) -> Result<()> { 90 96 self.run( 91 97 LuminaryAction::Stopping, 92 98 project, ··· 99 105 100 106 /// Recreates the given project and optionally, a specific service within that project. 101 107 #[wrap_err("Failed to recreate project/service")] 102 - pub async fn recreate(&self, project: &str, service: Option<&str>) -> Result<()> { 108 + pub async fn recreate(&self, project: &String, service: Option<&String>) -> Result<()> { 103 109 self.stop(project, service).await?; 104 110 self.start(project, service).await?; 105 111 Ok(()) ··· 107 113 108 114 /// Pulls the latest images for the given project and optionally, a specific service within that project. 109 115 #[wrap_err("Failed to pull project/service images")] 110 - pub async fn pull(&self, project: &str, service: Option<&str>) -> Result<()> { 116 + pub async fn pull(&self, project: &String, service: Option<&String>) -> Result<()> { 111 117 self.run( 112 118 LuminaryAction::Pulling, 113 119 project, ··· 120 126 121 127 /// Builds the images for the given project and optionally, a specific service within that project. 122 128 #[wrap_err("Failed to build project/service images")] 123 - pub async fn build(&self, project: &str, service: Option<&str>) -> Result<()> { 129 + pub async fn build(&self, project: &String, service: Option<&String>) -> Result<()> { 124 130 self.run( 125 131 LuminaryAction::Building, 126 132 project,
+101 -41
packages/node/src/core/logs.rs
··· 1 - use std::sync::Arc; 2 - 3 1 use crate::{ 4 - core::{LuminaryEngine, LuminaryStatus, ProjectLogChannel}, 2 + core::{LuminaryEngine, LuminaryStatus, ProjectLogChannel, ProjectLogChannelMessage}, 5 3 eyre_fmt, 6 4 }; 7 5 use bytes::{Bytes, BytesMut}; 8 6 use eyre::Context; 9 7 use futures_util::{StreamExt, stream::BoxStream}; 10 8 use log::{debug, error}; 11 - use tokio::sync::{RwLock, broadcast}; 9 + use tokio::sync::{broadcast}; 10 + use uuid::Uuid; 12 11 13 12 const EMPTY_LOGS_MESSAGE: &[u8] = b"No logs to show. Waiting for project to start...\n\r"; 13 + const LOG_WORKER_STREAM_UUID: Uuid = Uuid::from_u128(0x0); 14 14 15 15 impl LuminaryEngine { 16 16 /// Creates a stream of [Bytes] for clients to subscribe to. 17 - pub async fn logs_subscribe<'a>(&'_ self, project: String) -> BoxStream<'a, Bytes> { 18 - // Obtain entry for the project, creating a new one if neccessary 19 - let ProjectLogChannel { channel, buffer } = self 20 - .log_channels 21 - .lock() 22 - .await 23 - .entry(project.clone()) 24 - .or_insert_with(|| self.spawn_log_worker(project.clone())) 25 - .clone(); 17 + pub async fn logs_subscribe<'a>(self, project: String) -> BoxStream<'a, ProjectLogChannelMessage> { 18 + let ProjectLogChannel { channel, state } = self.get_log_channel(&project).await; 26 19 27 20 return async_stream::stream! { 28 21 // Surround with a block to drop read guard after reading buffer 29 - { 22 + { 23 + // Spawn log worker if it hasn't been spawned already. 24 + // Use temporary read guard as spawn_log_worker needs a write guard 25 + if !state.read().await.contains_key(&LOG_WORKER_STREAM_UUID) { 26 + self.spawn_log_worker(project.clone()).await; 27 + } 28 + 30 29 // Send previous logs in buffer to bring client up to date 31 - let bytes = &buffer.read().await; 32 - if !bytes.is_empty() { 33 - yield <BytesMut as Clone>::clone(&bytes).freeze() 34 - } 30 + for (uuid, buffer) in state.read().await.iter() { 31 + if !buffer.is_empty() { 32 + yield ProjectLogChannelMessage::Write(*uuid, <BytesMut as Clone>::clone(&buffer).freeze()); 33 + } 34 + } 35 35 } 36 36 37 37 let mut receiver = channel.subscribe(); ··· 48 48 .boxed(); 49 49 } 50 50 51 + /// Creates a new log sender for the given project. This will send a close message when dropped. 52 + pub async fn create_log_sender(&self, project: String) -> ProjectLogChannelSender { 53 + let channel = self.get_log_channel(&project).await; 54 + 55 + ProjectLogChannelSender { 56 + uuid: Uuid::new_v4(), 57 + engine: self.clone(), 58 + project: project, 59 + channel, 60 + } 61 + } 62 + 63 + /// Obtain the [ProjectLogChannel] for the given project, creating a new one if neccessary 64 + async fn get_log_channel(&self, project: &String) -> ProjectLogChannel { 65 + return self 66 + .log_channels 67 + .lock() 68 + .await 69 + .entry(project.clone()) 70 + .or_default() 71 + .clone(); 72 + } 73 + 51 74 /// Spawns a background worker that listens for logs sends them to clients. 52 - fn spawn_log_worker(&self, project: String) -> ProjectLogChannel { 75 + async fn spawn_log_worker(&self, project: String) { 53 76 let this = self.clone(); 77 + 78 + let channel = self.get_log_channel(&project).await; 79 + channel.state.write().await.insert(LOG_WORKER_STREAM_UUID, BytesMut::new()); 54 80 55 - let entry = ProjectLogChannel { 56 - channel: broadcast::channel(64).0, 57 - buffer: Arc::new(RwLock::new(BytesMut::new())), 58 - }; 81 + let mut sender = this.create_log_sender(project.clone()).await; 82 + sender.uuid = LOG_WORKER_STREAM_UUID; // Use reserved UUID for log worker stream 59 83 60 - let ProjectLogChannel { channel, buffer } = entry.clone(); 61 84 tokio::spawn(async move { 85 + 62 86 loop { 63 87 debug!("Starting logs stream for project '{}'...", project); 64 88 // Spawn docker compose process, yielding logs as they are recieved ··· 72 96 match result.wrap_err("Error streaming logs for project") { 73 97 Err(err) => error!("{}", eyre_fmt!(err)), 74 98 Ok(bytes) => { 75 - let bytes = normalise_line_endings(&bytes); 76 - 77 - // Update buffer with logs and send to subscribers 78 - buffer.write().await.extend_from_slice(&bytes); 79 - if channel.send(bytes).is_err() { 80 - // There are no subscribers, so clean up and stop the worker 81 - debug!("Cleaning up logs stream for project '{}'...", project); 82 - this.log_channels.lock().await.remove(&project); 83 - return; 99 + if sender.write(bytes).await { 100 + sender.close().await; 101 + break; 84 102 } 85 103 } 86 104 } ··· 92 110 debug!("Docker compose logs process exited, waiting for event to trigger retry..."); 93 111 94 112 // Send a message to clients if there are no logs to show 95 - { 96 - let mut buffer = buffer.write().await; 97 - if buffer.is_empty() { 98 - buffer.extend_from_slice(EMPTY_LOGS_MESSAGE); 99 - let _ = channel.send(Bytes::from(EMPTY_LOGS_MESSAGE)); 100 - } 113 + if sender.is_empty().await { 114 + sender.write(Bytes::from(EMPTY_LOGS_MESSAGE)).await; 101 115 } 102 116 103 117 loop { ··· 114 128 debug!("Received event indicating project is running, restarting logs stream..."); 115 129 116 130 // Clear buffer to avoid sending old logs 117 - buffer.write().await.clear(); 131 + sender.clear().await; 118 132 } 119 133 }); 134 + } 135 + } 120 136 121 - return entry; 137 + /// A wrapper representing a given project log stream, to be multiplexed and sent to clients. 138 + #[derive(Debug)] 139 + pub struct ProjectLogChannelSender { 140 + channel: ProjectLogChannel, 141 + engine: LuminaryEngine, 142 + project: String, 143 + uuid: Uuid, 144 + } 145 + 146 + impl ProjectLogChannelSender { 147 + //// Sends logs to clients and updates internal buffer. Returns true if there are no subscribers to receive the logs. 148 + pub async fn write(&self, bytes: Bytes) -> bool { 149 + let bytes = normalise_line_endings(&bytes); 150 + 151 + // Update buffer with new bytes 152 + self.channel.state.write().await.entry(self.uuid).or_default().extend_from_slice(&bytes); 153 + 154 + // Send bytes to clients, and record if there are no subscribers 155 + return self.channel.channel.send(ProjectLogChannelMessage::Write(self.uuid, bytes)).is_err(); 156 + } 157 + 158 + /// Clears the internal buffer for this log stream. 159 + pub async fn clear(&self) { 160 + self.channel.state.write().await.entry(self.uuid).or_default().clear(); 161 + } 162 + 163 + /// Checks if the internal buffer for this log stream is empty. 164 + pub async fn is_empty(&self) -> bool { 165 + return self.channel.state.read().await.get(&self.uuid).map(|buffer| buffer.is_empty()).unwrap_or(true); 166 + } 167 + 168 + /// Closes this log sender, removing its buffer and notifying clients. 169 + pub async fn close(&self) { 170 + debug!("Closing log sender '{}' for project '{}'", &self.uuid, &self.project); 171 + let mut streams = self.channel.state.write().await; 172 + streams.remove(&self.uuid); 173 + 174 + if self.channel.channel.send(ProjectLogChannelMessage::Close(self.uuid)).is_err() { 175 + if streams.is_empty() { 176 + debug!("No more log streams for project '{}', removing log channel...", &self.project); 177 + self.engine.log_channels.lock().await.remove(&self.project); 178 + } 179 + } else if streams.is_empty() { 180 + error!("Log channel for '{}' has no streams but still has subscribers, report this!", &self.project); 181 + } 122 182 } 123 183 } 124 184
+20 -3
packages/node/src/core/model.rs
··· 1 1 //! This module defines the core data models used within the Luminary application. 2 2 3 - use std::{fmt::Display, sync::Arc}; 3 + use std::{collections::HashMap, fmt::Display, sync::Arc}; 4 4 5 5 use bytes::{Bytes, BytesMut}; 6 6 use luminary_macros::hashmap_schema; 7 7 use salvo::oapi::{BasicType, Components, Object, RefOr, Schema, ToSchema}; 8 8 use serde::{Deserialize, Serialize, ser::SerializeStruct}; 9 9 use tokio::sync::{RwLock, broadcast}; 10 + use uuid::Uuid; 10 11 11 12 use crate::schema_ref_or; 12 13 ··· 180 181 /// This is created lazily when a client subscribes to logs for a project. 181 182 #[derive(Debug, Clone)] 182 183 pub struct ProjectLogChannel { 183 - pub channel: broadcast::Sender<Bytes>, 184 + pub channel: broadcast::Sender<ProjectLogChannelMessage>, 184 185 // Using an Arc here to allow the worker to keep a reference to the log buffer 185 - pub buffer: Arc<RwLock<BytesMut>>, 186 + pub state: Arc<RwLock<HashMap<Uuid, BytesMut>>>, 187 + } 188 + 189 + impl Default for ProjectLogChannel { 190 + fn default() -> Self { 191 + Self { 192 + state: Arc::new(RwLock::new(HashMap::new())), 193 + channel: broadcast::channel(64).0, 194 + } 195 + } 196 + } 197 + 198 + /// A message type for multiplexing log streams. 199 + #[derive(Debug, Clone)] 200 + pub enum ProjectLogChannelMessage { 201 + Write(Uuid, Bytes), 202 + Close(Uuid), 186 203 } 187 204 188 205 /// The configuration for updating a project. Allows for multiple updates at once.
+3 -3
packages/node/src/core/project.rs
··· 33 33 34 34 /// Validates a compose file by attempting to parse it and performing basic checks on the structure. 35 35 #[wrap_err("Invalid compose file")] 36 - fn validate_compose(&self, compose: &str) -> Result<()> { 36 + fn validate_compose(&self, compose: &String) -> Result<()> { 37 37 let compose = serde_saphyr::from_str::<Compose>(compose).wrap_err("Failed to parse compose file")?; 38 38 39 39 if compose.services.is_empty() { ··· 44 44 } 45 45 46 46 #[wrap_err("Failed to delete project")] 47 - pub async fn delete_project(&self, project: &str) -> Result<()> { 47 + pub async fn delete_project(&self, project: &String) -> Result<()> { 48 48 let (project_path, _) = self.get_path(project); 49 49 50 50 self.stop(project, None).await?; ··· 63 63 } 64 64 65 65 /// Updates the given project by applying the provided patch 66 - pub async fn patch_project(&self, project: &str, patch: &LuminaryProjectPatch) -> Result<()> { 66 + pub async fn patch_project(&self, project: &String, patch: &LuminaryProjectPatch) -> Result<()> { 67 67 // Validate request 68 68 if project.len() == 0 || patch.to.as_ref().is_some_and(|name| name.len() == 0) { 69 69 bail!("Project name cannot be empty");