forked from
tokono.ma/diffuse
A music player that connects to your cloud/distributed storage.
1import {
2 BroadcastableDiffuseElement,
3 defineElement,
4 query,
5} from "~/common/element.js";
6import { data, mergeTracks } from "~/common/output.js";
7import { signal, untracked } from "~/common/signal.js";
8import { listen } from "~/common/worker.js";
9
10/**
11 * @import {ProxiedActions} from "~/common/worker.d.ts"
12 * @import {InputElement} from "~/components/input/types.d.ts"
13 * @import {OutputElement} from "~/components/output/types.d.ts"
14 * @import {Track} from "~/definitions/types.d.ts"
15 * @import MetadataConfigurator from "~/components/configurator/metadata/element.js"
16 *
17 * @import {Actions, Progress} from "./types.d.ts"
18 */
19
20////////////////////////////////////////////
21// ELEMENT
22////////////////////////////////////////////
23
24/**
25 * Processes inputs into tracks whenever
26 * the already existing tracks are loaded
27 * from the assigned output element.
28 */
29class ProcessTracksOrchestrator extends BroadcastableDiffuseElement {
30 static NAME = "diffuse/orchestrator/process-tracks";
31 static WORKER_URL = "components/orchestrator/process-tracks/worker.js";
32
33 /** @type {ProxiedActions<Actions>} */
34 #proxy;
35
36 constructor() {
37 super();
38 this.#proxy = this.workerProxy({
39 forceNew: {
40 dependencies: { input: true },
41 },
42 });
43 }
44
45 // SIGNALS
46
47 #isProcessing = signal(false);
48 #performedInitialProcess = signal(false);
49 #progress = signal(/** @type {Progress} */ ({ processed: 0, total: 0 }));
50
51 // STATE
52
53 isProcessing = this.#isProcessing.get;
54 progress = this.#progress.get;
55
56 // LIFECYCLE
57
58 /**
59 * @override
60 */
61 async connectedCallback() {
62 // Broadcast if needed
63 if (this.hasAttribute("group")) {
64 const actions = this.broadcast(this.identifier, {
65 getPerfInit: {
66 strategy: "leaderOnly",
67 fn: this.#performedInitialProcess.get,
68 },
69 setPerfInit: {
70 strategy: "replicate",
71 fn: this.#performedInitialProcess.set,
72 },
73 getIsProcessing: {
74 strategy: "leaderOnly",
75 fn: this.#isProcessing.get,
76 },
77 setIsProcessing: {
78 strategy: "replicate",
79 fn: this.#isProcessing.set,
80 },
81 process: { strategy: "leaderOnly", fn: this.process },
82 });
83
84 if (!actions) return;
85
86 this.process = actions.process;
87 this.#isProcessing.set = actions.setIsProcessing;
88
89 // Sync #performedInitialProcess and #isProcessing with leader
90 actions.getPerfInit().then((val) => {
91 this.#performedInitialProcess.value = val;
92 });
93
94 actions.getIsProcessing().then((val) => {
95 this.#isProcessing.value = val;
96 });
97 }
98
99 // Super
100 super.connectedCallback();
101
102 /** @type {InputElement} */
103 const input = query(this, "input-selector");
104
105 /** @type {OutputElement} */
106 const output = query(this, "output-selector");
107
108 /** @type {MetadataConfigurator} */
109 const metadataConfigurator = query(this, "metadata-selector");
110
111 // Assign to self
112 this.input = input;
113 this.output = output;
114 this.metadataConfigurator = metadataConfigurator;
115
116 // Worker link
117 const link = this.workerLink();
118
119 // Wait until defined
120 await customElements.whenDefined(input.localName);
121 await customElements.whenDefined(output.localName);
122 await customElements.whenDefined(metadataConfigurator.localName);
123
124 // Sync progress with worker
125 listen("progress", this.#progress.set, link);
126 this.#proxy.progress().then(this.#progress.set);
127
128 //
129 listen("list", /** @param {Track[]} tracks */ async (tracks) => {
130 if (!this.output) return;
131 this.output.tracks.save(tracks);
132 }, link);
133
134 // Save patched tracks as they arrive so progress isn't lost.
135 // Merge with existing tracks to avoid overwriting ones not yet patched.
136 listen("patch", /** @param {Track[]} tracks */ async (tracks) => {
137 if (!this.output) return;
138 const existing = await data(this.output.tracks);
139 const merged = mergeTracks(existing, tracks);
140 this.output.tracks.save(merged);
141 }, link);
142
143 // Process whenever tracks are initially loaded;
144 // unless already done so (possibly through another instance of this element)
145 if (this.hasAttribute("process-when-ready")) {
146 let unregistered = false;
147
148 const unregister = this.effect(() => {
149 if (unregistered) {
150 unregister();
151 return;
152 }
153
154 const col = output.tracks.collection();
155 if (col.state !== "loaded") return;
156
157 if (this.#performedInitialProcess.value) {
158 unregistered = true;
159 return;
160 }
161
162 this.#performedInitialProcess.set(true);
163
164 const skip = /** @type {any} */ (import.meta).env
165 ?.DISABLE_AUTOMATIC_TRACKS_PROCESSING ?? false;
166 if (skip) return;
167
168 unregistered = true;
169 untracked(() => this.process());
170 });
171 }
172 }
173
174 // WORKERS
175
176 /**
177 * @override
178 */
179 dependencies() {
180 if (!this.input) throw new Error("Input element not defined yet");
181 if (!this.metadataConfigurator) {
182 throw new Error("Metadata configurator element not defined yet");
183 }
184
185 return {
186 input: this.input,
187 metadata: this.metadataConfigurator,
188 };
189 }
190
191 // ACTIONS
192
193 async process() {
194 if (!this.output) return;
195 if (this.#isProcessing.value) return;
196
197 // Start
198 this.#isProcessing.set(true);
199 console.log("🪵 Processing initiated");
200
201 const cachedTracks = await data(this.output.tracks);
202 const result = await this.#proxy.process(cachedTracks);
203
204 if (result) {
205 await this.output.tracks.save(mergeTracks(cachedTracks, result));
206 }
207
208 // Fin
209 console.log("🪵 Processing completed");
210 this.#isProcessing.set(false);
211 }
212}
213
214export default ProcessTracksOrchestrator;
215
216////////////////////////////////////////////
217// REGISTER
218////////////////////////////////////////////
219
220export const CLASS = ProcessTracksOrchestrator;
221export const NAME = "do-process-tracks";
222
223defineElement(NAME, ProcessTracksOrchestrator);