A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v4 223 lines 6.0 kB view raw
1import { 2 BroadcastableDiffuseElement, 3 defineElement, 4 query, 5} from "~/common/element.js"; 6import { data, mergeTracks } from "~/common/output.js"; 7import { signal, untracked } from "~/common/signal.js"; 8import { listen } from "~/common/worker.js"; 9 10/** 11 * @import {ProxiedActions} from "~/common/worker.d.ts" 12 * @import {InputElement} from "~/components/input/types.d.ts" 13 * @import {OutputElement} from "~/components/output/types.d.ts" 14 * @import {Track} from "~/definitions/types.d.ts" 15 * @import MetadataConfigurator from "~/components/configurator/metadata/element.js" 16 * 17 * @import {Actions, Progress} from "./types.d.ts" 18 */ 19 20//////////////////////////////////////////// 21// ELEMENT 22//////////////////////////////////////////// 23 24/** 25 * Processes inputs into tracks whenever 26 * the already existing tracks are loaded 27 * from the assigned output element. 28 */ 29class ProcessTracksOrchestrator extends BroadcastableDiffuseElement { 30 static NAME = "diffuse/orchestrator/process-tracks"; 31 static WORKER_URL = "components/orchestrator/process-tracks/worker.js"; 32 33 /** @type {ProxiedActions<Actions>} */ 34 #proxy; 35 36 constructor() { 37 super(); 38 this.#proxy = this.workerProxy({ 39 forceNew: { 40 dependencies: { input: true }, 41 }, 42 }); 43 } 44 45 // SIGNALS 46 47 #isProcessing = signal(false); 48 #performedInitialProcess = signal(false); 49 #progress = signal(/** @type {Progress} */ ({ processed: 0, total: 0 })); 50 51 // STATE 52 53 isProcessing = this.#isProcessing.get; 54 progress = this.#progress.get; 55 56 // LIFECYCLE 57 58 /** 59 * @override 60 */ 61 async connectedCallback() { 62 // Broadcast if needed 63 if (this.hasAttribute("group")) { 64 const actions = this.broadcast(this.identifier, { 65 getPerfInit: { 66 strategy: "leaderOnly", 67 fn: this.#performedInitialProcess.get, 68 }, 69 setPerfInit: { 70 strategy: "replicate", 71 fn: this.#performedInitialProcess.set, 72 }, 73 getIsProcessing: { 74 strategy: "leaderOnly", 75 fn: this.#isProcessing.get, 76 }, 77 setIsProcessing: { 78 strategy: "replicate", 79 fn: this.#isProcessing.set, 80 }, 81 process: { strategy: "leaderOnly", fn: this.process }, 82 }); 83 84 if (!actions) return; 85 86 this.process = actions.process; 87 this.#isProcessing.set = actions.setIsProcessing; 88 89 // Sync #performedInitialProcess and #isProcessing with leader 90 actions.getPerfInit().then((val) => { 91 this.#performedInitialProcess.value = val; 92 }); 93 94 actions.getIsProcessing().then((val) => { 95 this.#isProcessing.value = val; 96 }); 97 } 98 99 // Super 100 super.connectedCallback(); 101 102 /** @type {InputElement} */ 103 const input = query(this, "input-selector"); 104 105 /** @type {OutputElement} */ 106 const output = query(this, "output-selector"); 107 108 /** @type {MetadataConfigurator} */ 109 const metadataConfigurator = query(this, "metadata-selector"); 110 111 // Assign to self 112 this.input = input; 113 this.output = output; 114 this.metadataConfigurator = metadataConfigurator; 115 116 // Worker link 117 const link = this.workerLink(); 118 119 // Wait until defined 120 await customElements.whenDefined(input.localName); 121 await customElements.whenDefined(output.localName); 122 await customElements.whenDefined(metadataConfigurator.localName); 123 124 // Sync progress with worker 125 listen("progress", this.#progress.set, link); 126 this.#proxy.progress().then(this.#progress.set); 127 128 // 129 listen("list", /** @param {Track[]} tracks */ async (tracks) => { 130 if (!this.output) return; 131 this.output.tracks.save(tracks); 132 }, link); 133 134 // Save patched tracks as they arrive so progress isn't lost. 135 // Merge with existing tracks to avoid overwriting ones not yet patched. 136 listen("patch", /** @param {Track[]} tracks */ async (tracks) => { 137 if (!this.output) return; 138 const existing = await data(this.output.tracks); 139 const merged = mergeTracks(existing, tracks); 140 this.output.tracks.save(merged); 141 }, link); 142 143 // Process whenever tracks are initially loaded; 144 // unless already done so (possibly through another instance of this element) 145 if (this.hasAttribute("process-when-ready")) { 146 let unregistered = false; 147 148 const unregister = this.effect(() => { 149 if (unregistered) { 150 unregister(); 151 return; 152 } 153 154 const col = output.tracks.collection(); 155 if (col.state !== "loaded") return; 156 157 if (this.#performedInitialProcess.value) { 158 unregistered = true; 159 return; 160 } 161 162 this.#performedInitialProcess.set(true); 163 164 const skip = /** @type {any} */ (import.meta).env 165 ?.DISABLE_AUTOMATIC_TRACKS_PROCESSING ?? false; 166 if (skip) return; 167 168 unregistered = true; 169 untracked(() => this.process()); 170 }); 171 } 172 } 173 174 // WORKERS 175 176 /** 177 * @override 178 */ 179 dependencies() { 180 if (!this.input) throw new Error("Input element not defined yet"); 181 if (!this.metadataConfigurator) { 182 throw new Error("Metadata configurator element not defined yet"); 183 } 184 185 return { 186 input: this.input, 187 metadata: this.metadataConfigurator, 188 }; 189 } 190 191 // ACTIONS 192 193 async process() { 194 if (!this.output) return; 195 if (this.#isProcessing.value) return; 196 197 // Start 198 this.#isProcessing.set(true); 199 console.log("🪵 Processing initiated"); 200 201 const cachedTracks = await data(this.output.tracks); 202 const result = await this.#proxy.process(cachedTracks); 203 204 if (result) { 205 await this.output.tracks.save(mergeTracks(cachedTracks, result)); 206 } 207 208 // Fin 209 console.log("🪵 Processing completed"); 210 this.#isProcessing.set(false); 211 } 212} 213 214export default ProcessTracksOrchestrator; 215 216//////////////////////////////////////////// 217// REGISTER 218//////////////////////////////////////////// 219 220export const CLASS = ProcessTracksOrchestrator; 221export const NAME = "do-process-tracks"; 222 223defineElement(NAME, ProcessTracksOrchestrator);