A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: processing batches

+112 -10
+82
src/common/output.js
··· 5 5 */ 6 6 7 7 /** 8 + * Merges two track arrays by `id`. Tracks from `incoming` replace any 9 + * matching tracks in `existing`; unmatched existing tracks are preserved. 10 + * 11 + * @template {{ id: string }} T 12 + * @param {T[]} existing 13 + * @param {T[]} incoming 14 + * @returns {T[]} 15 + * 16 + * @example Returns incoming tracks when existing is empty 17 + * ```js 18 + * import { mergeTracks } from "~/common/output.js"; 19 + * 20 + * const result = mergeTracks([], [{ id: "a" }, { id: "b" }]); 21 + * if (result.map(t => t.id).join(",") !== "a,b") throw new Error("unexpected result"); 22 + * ``` 23 + * 24 + * @example Returns existing tracks when incoming is empty 25 + * ```js 26 + * import { mergeTracks } from "~/common/output.js"; 27 + * 28 + * const result = mergeTracks([{ id: "a" }, { id: "b" }], []); 29 + * if (result.map(t => t.id).join(",") !== "a,b") throw new Error("unexpected result"); 30 + * ``` 31 + * 32 + * @example Preserves tracks not present in incoming 33 + * ```js 34 + * import { mergeTracks } from "~/common/output.js"; 35 + * 36 + * const result = mergeTracks([{ id: "a" }, { id: "b" }], [{ id: "c" }]); 37 + * if (result.map(t => t.id).join(",") !== "a,b,c") throw new Error("unexpected result"); 38 + * ``` 39 + * 40 + * @example Replaces existing track with incoming version when ids match 41 + * ```js 42 + * import { mergeTracks } from "~/common/output.js"; 43 + * 44 + * const result = mergeTracks([{ id: "a", uri: "old://a" }], [{ id: "a", uri: "new://a" }]); 45 + * if (result.length !== 1) throw new Error("expected length 1"); 46 + * if (result[0].uri !== "new://a") throw new Error("expected new uri"); 47 + * ``` 48 + * 49 + * @example Preserves other-source tracks when incoming covers one source 50 + * ```js 51 + * import { mergeTracks } from "~/common/output.js"; 52 + * 53 + * const existing = [ 54 + * { id: "s3-1", uri: "s3://bucket/a.flac" }, 55 + * { id: "s3-2", uri: "s3://bucket/b.flac" }, 56 + * { id: "wd-1", uri: "webdav://server/c.flac" }, 57 + * ]; 58 + * const incoming = [ 59 + * { id: "s3-1", uri: "s3://bucket/a.flac" }, 60 + * { id: "s3-3", uri: "s3://bucket/d.flac" }, 61 + * ]; 62 + * const result = mergeTracks(existing, incoming); 63 + * const sorted = result.map(t => t.id).sort().join(","); 64 + * if (sorted !== "s3-1,s3-2,s3-3,wd-1") throw new Error("unexpected result: " + sorted); 65 + * ``` 66 + * 67 + * @example Incoming tracks appear after preserved existing tracks 68 + * ```js 69 + * import { mergeTracks } from "~/common/output.js"; 70 + * 71 + * const result = mergeTracks([{ id: "x" }], [{ id: "y" }, { id: "z" }]); 72 + * if (result.map(t => t.id).join(",") !== "x,y,z") throw new Error("unexpected result"); 73 + * ``` 74 + * 75 + * @example Handles both arrays empty 76 + * ```js 77 + * import { mergeTracks } from "~/common/output.js"; 78 + * 79 + * const result = mergeTracks([], []); 80 + * if (result.length !== 0) throw new Error("expected empty result"); 81 + * ``` 82 + */ 83 + export function mergeTracks(existing, incoming) { 84 + const ids = new Set(incoming.map((t) => t.id)); 85 + const preserved = existing.filter((t) => !ids.has(t.id)); 86 + return [...preserved, ...incoming]; 87 + } 88 + 89 + /** 8 90 * @template T 9 91 * @param {{ collection: SignalReader<{ state: "loading" } | { state: "loaded"; data: T }> }} output 10 92 * @returns {Promise<T>}
+24 -8
src/components/orchestrator/process-tracks/element.js
··· 1 - import { BroadcastableDiffuseElement, defineElement, query } from "~/common/element.js"; 1 + import { 2 + BroadcastableDiffuseElement, 3 + defineElement, 4 + query, 5 + } from "~/common/element.js"; 6 + import { data, mergeTracks } from "~/common/output.js"; 2 7 import { signal, untracked } from "~/common/signal.js"; 3 8 import { listen } from "~/common/worker.js"; 4 9 ··· 6 11 * @import {ProxiedActions} from "~/common/worker.d.ts" 7 12 * @import {InputElement} from "~/components/input/types.d.ts" 8 13 * @import {OutputElement} from "~/components/output/types.d.ts" 14 + * @import {Track} from "~/definitions/types.d.ts" 9 15 * @import MetadataConfigurator from "~/components/configurator/metadata/element.js" 10 16 * 11 17 * @import {Actions, Progress} from "./types.d.ts" ··· 119 125 listen("progress", this.#progress.set, link); 120 126 this.#proxy.progress().then(this.#progress.set); 121 127 122 - // Save intermediate batches as they arrive so progress isn't lost 123 - listen("batch", (tracks) => this.output?.tracks.save(tracks), link); 128 + // 129 + listen("list", /** @param {Track[]} tracks */ async (tracks) => { 130 + if (!this.output) return; 131 + this.output.tracks.save(tracks); 132 + }, link); 133 + 134 + // Save intermediate batches as they arrive so progress isn't lost. 135 + // Merge with tracks not present in the batch to avoid overwriting them. 136 + listen("batch", /** @param {Track[]} tracks */ async (tracks) => { 137 + if (!this.output) return; 138 + const existing = await data(this.output.tracks); 139 + this.output.tracks.save(mergeTracks(existing, tracks)); 140 + }, link); 124 141 125 142 // Process whenever tracks are initially loaded; 126 143 // unless already done so (possibly through another instance of this element) ··· 151 168 untracked(() => this.process()); 152 169 }); 153 170 } 154 - 155 171 } 156 172 157 173 // WORKERS ··· 181 197 this.#isProcessing.set(true); 182 198 console.log("🪵 Processing initiated"); 183 199 184 - const tracksCol = this.output.tracks.collection(); 185 - const cachedTracks = tracksCol.state === "loaded" ? tracksCol.data : []; 200 + const cachedTracks = await data(this.output.tracks); 186 201 const result = await this.#proxy.process(cachedTracks); 187 202 188 - // Save if collection changed 189 - if (result) await this.output.tracks.save(result); 203 + if (result) { 204 + await this.output.tracks.save(mergeTracks(cachedTracks, result)); 205 + } 190 206 191 207 // Fin 192 208 console.log("🪵 Processing completed");
+6 -2
src/components/orchestrator/process-tracks/worker.js
··· 29 29 * @param {any} context 30 30 * @returns {ActionsWithTunnel<Actions>["process"]} 31 31 */ 32 - const process = (context) => /** @type {ActionsWithTunnel<Actions>["process"]} */ (async ({ data, ports }) => { 32 + const process = ( 33 + context, 34 + ) => /** @type {ActionsWithTunnel<Actions>["process"]} */ (async ( 35 + { data, ports }, 36 + ) => { 33 37 const cachedTracks = data; 34 38 35 39 // Reset progress ··· 50 54 // Persist the full track list immediately so that an interrupted metadata 51 55 // processing run doesn't lose discovered tracks. On next run they'll come 52 56 // back as cachedTracks and only the ones without metadata need reprocessing. 53 - announce("batch", tracks, context); 57 + announce("list", tracks, context); 54 58 55 59 // Reset progress 56 60 $progress.value = { processed: 0, total: tracks.length };