A focused Docker Compose management web application.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: realtime stream disposal

Brooke 10a952eb 6339aef1

+77 -26
+2
packages/node/src/core/logs.rs
··· 33 33 } 34 34 35 35 let mut receiver = channel.subscribe(); 36 + debug!("Subscriber number {} subscribed to logs for project '{}'", channel.receiver_count(), project); 37 + 36 38 loop { 37 39 match receiver.recv().await { 38 40 Ok(event) => yield event,
+23 -11
packages/panel/src/lib/api/realtime.svelte.ts
··· 1 1 import { parseServerSentEvents, type ServerSentEvent } from "parse-sse"; 2 2 import type { components } from "./openapi"; 3 - import { error, sleep, warn } from "$lib"; 4 - import { patch } from "ultrapatch"; 3 + import { Backoff, error, warn } from "$lib"; 5 4 import { client, isAuthenticated } from "."; 6 5 import { goto } from "$app/navigation"; 6 + import { patch } from "ultrapatch"; 7 7 8 8 export type LuminaryStateList = components["schemas"]["luminary_node.core.model.LuminaryStateList"]; 9 9 export type LuminaryProject = components["schemas"]["luminary_node.core.model.LuminaryProject"]; 10 10 type LogMessage = components["schemas"]["luminary_node.logging.LogMessage"]; 11 - 12 - const INITIAL_RETRY_DELAY = 1000; 13 11 14 12 /** 15 13 * The current internal list of projects and their states. ··· 25 23 /** 26 24 * Subscribes to real-time updates from the server. 27 25 */ 28 - export async function subscribe(fetch?: typeof globalThis.fetch) { 29 - let retryDelay = INITIAL_RETRY_DELAY; 26 + export function subscribe(fetch?: typeof globalThis.fetch) { 27 + if (!isAuthenticated()) goto("/login"); 30 28 31 - if (!isAuthenticated()) goto("/login"); 29 + const controller = new AbortController(); 30 + listen(controller.signal, fetch); 31 + return () => controller.abort(); 32 + } 33 + 34 + /** 35 + * The main listning loop that connects to the server and processes incoming events. 36 + */ 37 + async function listen(signal: AbortSignal, fetch?: typeof globalThis.fetch) { 38 + const backoff = new Backoff(); 32 39 33 40 while (isAuthenticated()) { 34 41 try { 35 42 const { response } = await client.GET("/api/realtime", { 36 43 parseAs: "stream", 44 + signal, 37 45 fetch, 38 46 }); 39 47 40 - retryDelay = INITIAL_RETRY_DELAY; 48 + backoff.reset(); 41 49 list = {}; 42 50 43 51 try { 44 - for await (const event of parseServerSentEvents(response) as unknown as AsyncIterable<ServerSentEvent>) 52 + for await (const event of parseServerSentEvents( 53 + response, 54 + ) as unknown as AsyncIterable<ServerSentEvent>) { 55 + if (signal.aborted) return; 45 56 handleEvent(event); 57 + } 46 58 } catch (err) { 47 59 error("Connection to server lost", [ 48 60 "Your connection to the server was lost: " + (err instanceof Error ? err.message : String(err)), ··· 50 62 ]); 51 63 } 52 64 } catch (_) { 53 - await sleep(retryDelay); 54 - retryDelay = Math.min(retryDelay * 2, 30000); // Exponential backoff with a max of 30 seconds 65 + if (signal.aborted) return; 66 + await backoff.wait(); 55 67 } 56 68 } 57 69 }
+1 -1
packages/panel/src/lib/component/LoaderButton.svelte
··· 47 47 .loader { 48 48 width: 14px; 49 49 height: 14px; 50 - border: 3px solid #fff; 50 + border: 3px solid var(--text); 51 51 border-bottom-color: transparent; 52 52 border-radius: 50%; 53 53 display: inline-block;
+27 -8
packages/panel/src/lib/component/LogTerminal.svelte
··· 6 6 import { Terminal } from "@xterm/xterm"; 7 7 import "@xterm/xterm/css/xterm.css"; 8 8 import { client } from "$lib/api"; 9 + import { Backoff } from "$lib"; 9 10 10 11 let { project }: { project: string } = $props(); 11 12 ··· 21 22 const observer = new ResizeObserver(() => fitAddon.fit()); 22 23 observer.observe(el); 23 24 24 - stream(terminal); 25 + const abort = new AbortController(); 26 + stream(terminal, abort.signal); 25 27 26 28 return () => { 29 + abort.abort(); 27 30 observer.disconnect(); 28 31 terminal.dispose(); 29 32 }; 30 33 }; 31 34 32 - async function stream(terminal: Terminal) { 33 - const { response } = await client.GET("/api/project/{project}/logs", { 34 - params: { path: { project } }, 35 - parseAs: "stream", 36 - }); 35 + async function stream(terminal: Terminal, signal: AbortSignal) { 36 + const backoff = new Backoff(); 37 + 38 + while (true) { 39 + try { 40 + const { response } = await client.GET("/api/project/{project}/logs", { 41 + params: { path: { project } }, 42 + parseAs: "stream", 43 + signal, 44 + }); 45 + 46 + backoff.reset(); 47 + 48 + for await (const event of parseServerSentEvents( 49 + response, 50 + ) as unknown as AsyncIterable<ServerSentEvent>) { 51 + if (signal.aborted) return; 37 52 38 - for await (const event of parseServerSentEvents(response) as unknown as AsyncIterable<ServerSentEvent>) { 39 - terminal.write(Uint8Array.from(atob(event.data), (c) => c.charCodeAt(0))); 53 + terminal.write(Uint8Array.from(atob(event.data), (c) => c.charCodeAt(0))); 54 + } 55 + } catch (_) { 56 + if (signal.aborted) return; 57 + await backoff.wait(); 58 + } 40 59 } 41 60 } 42 61 </script>
+21
packages/panel/src/lib/index.ts
··· 22 22 export function sleep(ms: number) { 23 23 return new Promise((resolve) => setTimeout(resolve, ms)); 24 24 } 25 + 26 + export class Backoff { 27 + public currentDelay: number; 28 + public initialDelay: number; 29 + public maxDelay: number; 30 + 31 + public constructor(initialDelay: number = 1000, maxDelay: number = 30000) { 32 + this.currentDelay = initialDelay; 33 + this.initialDelay = initialDelay; 34 + this.maxDelay = maxDelay; 35 + } 36 + 37 + public reset() { 38 + this.currentDelay = this.initialDelay; 39 + } 40 + 41 + public async wait() { 42 + await sleep(this.currentDelay); 43 + this.currentDelay = Math.min(this.currentDelay * 2, this.maxDelay); 44 + } 45 + }
+3
packages/panel/src/routes/(authenticated)/+layout.svelte
··· 1 1 <script lang="ts"> 2 2 import Navbar from "./Navbar.svelte"; 3 + import { api } from "$lib"; 3 4 4 5 let { children } = $props(); 6 + 7 + $effect(() => api.subscribe(fetch)); 5 8 </script> 6 9 7 10 <div class="container">
-6
packages/panel/src/routes/(authenticated)/+layout.ts
··· 1 - import { api } from "$lib"; 2 - import type { LayoutLoad } from "./$types"; 3 - 4 - export const load: LayoutLoad = async ({ fetch }) => { 5 - api.subscribe(fetch); 6 - };