forked from
tokono.ma/diffuse
A music player that connects to your cloud/distributed storage.
1import { decode, encode } from "@atcute/cbor";
2import { ifDefined } from "lit-html/directives/if-defined.js";
3import deepDiff from "@fry69/deep-diff";
4
5import "~/components/output/polymorphic/indexed-db/element.js";
6
7import * as CID from "~/common/cid.js";
8import { diff, strictEquality } from "~/common/compare.js";
9import { computed, signal } from "~/common/signal.js";
10import { compareTimestamps } from "~/common/temporal.js";
11import { OutputTransformer } from "../../base.js";
12import { defineElement } from "~/common/element.js";
13
14/**
15 * @import { SignalReader } from "~/common/signal.d.ts";
16 * @import { RenderArg } from "~/common/element.d.ts"
17 * @import { OutputElement } from "~/components/output/types.d.ts"
18 *
19 * @import { Container } from "./types.d.ts"
20 */
21
22/** @type {Container<any>} */
23const EMPTY = {
24 cid: undefined,
25 data: [],
26 inventory: { current: {}, removed: [] },
27};
28
29/**
30 * @extends {OutputTransformer<Uint8Array>}
31 */
32class DaslBytesSyncOutputTransformer extends OutputTransformer {
33 static NAME = "diffuse/transformer/output/bytes/dasl-sync";
34
35 constructor() {
36 super();
37
38 const remote = this.base();
39 const local = this.#localOutput.get;
40
41 /**
42 * @template {{ id: string; updatedAt: string }} T
43 * @param {string} kind
44 * @param {SignalReader<{ state: "loading" } | { state: "loaded"; data: Uint8Array | undefined }>} localCollection
45 * @param {SignalReader<{ state: "loading" } | { state: "loaded"; data: Uint8Array | undefined }>} remoteCollection
46 * @param {{ saveLocal: (bytes: Uint8Array) => Promise<void>; saveRemote: (bytes: Uint8Array) => Promise<void> }} sync
47 */
48 const state = (
49 kind,
50 localCollection,
51 remoteCollection,
52 { saveLocal, saveRemote },
53 ) => {
54 const container = signal(
55 /** @type {Container<T>} */ (EMPTY),
56 { compare: strictEquality },
57 );
58
59 const isReady = signal(false);
60 const merging = signal({ isBusy: false, lastCID: "" }, {
61 compare: diff,
62 });
63
64 this.effect(() => {
65 if (!isReady.value) return;
66 if (merging.value.isBusy) return;
67
68 const lc = localCollection();
69 const rc = remote.ready() ? remoteCollection() : undefined;
70
71 const lb = lc?.state === "loaded" ? lc.data : undefined;
72 const rb = rc?.state === "loaded" ? rc.data : undefined;
73 const rs = rc?.state;
74
75 /** @type {Container<T> | undefined} */
76 const l = lb ? decode(lb) : undefined;
77
78 /** @type {Container<T> | undefined} */
79 const r = rb && rs === "loaded" ? decode(rb) : undefined;
80
81 if (!r) {
82 if (l) {
83 container.value = l;
84
85 if (remote.ready() && rs === "loaded") {
86 this.isLeader().then((isLeader) => {
87 if (!isLeader) return;
88 const bytes = this.save(l);
89 saveRemote(bytes);
90 });
91 }
92 }
93 } else if (!l) {
94 container.value = r;
95
96 this.isLeader().then((isLeader) => {
97 if (!isLeader) return;
98 const bytes = this.save(r);
99 saveLocal(bytes);
100 });
101 } else if (
102 rs === "loaded" && this.hasDiverged({ local: l, remote: r })
103 ) {
104 // Async merge
105 this.isLeader().then((isLeader) => {
106 if (!isLeader) return;
107
108 merging.value = { isBusy: true, lastCID: merging.value.lastCID };
109
110 this.merge(l, r).then(async (c) => {
111 try {
112 container.value = c;
113
114 if (c.cid === merging.value.lastCID) return;
115
116 const bytes = this.save(c);
117
118 if (c.cid !== l.cid) {
119 await saveLocal(bytes);
120 }
121
122 if (remote.ready() && rs === "loaded" && c.cid !== r.cid) {
123 await saveRemote(bytes);
124 }
125 } finally {
126 merging.value = { isBusy: false, lastCID: c.cid ?? "" };
127 }
128 });
129 });
130 } else {
131 container.value = l;
132 }
133 });
134
135 return computed(() => {
136 if (!isReady.get()) isReady.value = true;
137 return container.get();
138 });
139 };
140
141 // Container signals
142 const facets = state(
143 "facets",
144 computed(() => local()?.facets.collection() ?? { state: "loading" }),
145 remote.facets.collection,
146 {
147 saveLocal: async (v) => local()?.facets.save(v),
148 saveRemote: remote.facets.save,
149 },
150 );
151
152 const playlistItems = state(
153 "playlistItems",
154 computed(() =>
155 local()?.playlistItems.collection() ?? { state: "loading" }
156 ),
157 remote.playlistItems.collection,
158 {
159 saveLocal: async (v) => local()?.playlistItems.save(v),
160 saveRemote: remote.playlistItems.save,
161 },
162 );
163
164 const settings = state(
165 "settings",
166 computed(() => local()?.settings.collection() ?? { state: "loading" }),
167 remote.settings.collection,
168 {
169 saveLocal: async (v) => local()?.settings.save(v),
170 saveRemote: remote.settings.save,
171 },
172 );
173
174 const tracks = state(
175 "tracks",
176 computed(() => local()?.tracks.collection() ?? { state: "loading" }),
177 remote.tracks.collection,
178 {
179 saveLocal: async (v) => local()?.tracks.save(v),
180 saveRemote: remote.tracks.save,
181 },
182 );
183
184 // Output manager
185 this.facets = this.managerProp(
186 { save: async (v) => local()?.facets.save(v) },
187 remote.facets,
188 remote.ready,
189 facets,
190 );
191
192 this.playlistItems = this.managerProp(
193 { save: async (v) => local()?.playlistItems.save(v) },
194 remote.playlistItems,
195 remote.ready,
196 playlistItems,
197 );
198
199 this.settings = this.managerProp(
200 { save: async (v) => local()?.settings.save(v) },
201 remote.settings,
202 remote.ready,
203 settings,
204 );
205
206 this.tracks = this.managerProp(
207 { save: async (v) => local()?.tracks.save(v) },
208 remote.tracks,
209 remote.ready,
210 tracks,
211 );
212
213 this.ready = () => true;
214 }
215
216 // SIGNALS
217
218 #localOutput = signal(
219 /** @type {OutputElement<any> | undefined} */ (undefined),
220 );
221
222 // LIFECYCLE
223
224 /**
225 * @override
226 */
227 async connectedCallback() {
228 // Broadcast if needed
229 if (this.hasAttribute("group")) {
230 this.broadcast(this.identifier, {});
231 }
232
233 super.connectedCallback();
234
235 /** @type {OutputElement<any> | null} */
236 const local = this.root().querySelector("dop-indexed-db");
237 if (!local) throw new Error("Can't find local output");
238
239 customElements.whenDefined(local.localName).then(() => {
240 this.#localOutput.value = local;
241 });
242 }
243
244 // DATA FUNCTIONS
245
246 /**
247 * @template {{ id: string; updatedAt: string }} T
248 * @param {{ previous: Container<T>, collection: T[] }} _
249 * @returns {Promise<Container<T>>}
250 */
251 async updateContainer({ previous, collection }) {
252 const inventory = previous.inventory;
253
254 const collIds = collection.map(({ id }) => id);
255
256 const currSet = new Set(Object.keys(inventory.current));
257 const collSet = new Set(collIds);
258
259 const newSet = collSet.difference(currSet);
260 const remSet = currSet.difference(collSet);
261
262 const alreadyRemoved = new Set(inventory.removed);
263 const allRemoved = alreadyRemoved.union(remSet);
264
265 /** @type {Record<string, string>} */
266 const current = { ...inventory.current };
267
268 remSet.forEach((id) => {
269 delete current[id];
270 });
271
272 /** @type Promise<void>[] */
273 const promises = [];
274
275 collection.forEach((a) => {
276 if (!newSet.has(a.id)) return;
277
278 // Item is new, calculate CID and add it to the `current` dictionary
279 const encoded = encode(a);
280
281 promises.push((async () => {
282 const cid = await CID.create(0x71, encoded);
283 current[a.id] = cid;
284 })());
285 });
286
287 await Promise.all(promises);
288
289 const newInventory = {
290 current,
291 removed: Array.from(allRemoved),
292 };
293
294 return {
295 cid: await CID.create(0x71, encode(newInventory)),
296 data: collection,
297 inventory: newInventory,
298 };
299 }
300
301 /**
302 * @template {{ id: string; updatedAt: string }} T
303 * @param {{ local: Container<T>, remote: Container<T> }} _
304 */
305 hasDiverged({ local, remote }) {
306 return local.cid !== remote.cid;
307 }
308
309 /**
310 * @template {{ id: string; updatedAt: string }} T
311 * @param {Container<T>} a
312 * @param {Container<T>} b
313 * @returns {Promise<Container<T>>}
314 */
315 async merge(a, b) {
316 const removedA = new Set(a.inventory.removed);
317 const removedB = new Set(b.inventory.removed);
318 const allRemoved = removedA.union(removedB);
319
320 const currentA = a.inventory.current;
321 const currentB = b.inventory.current;
322
323 const mapA = new Map(a.data.map((item) => [item.id, item]));
324 const mapB = new Map(b.data.map((item) => [item.id, item]));
325
326 // Combine all known ids from both sides
327 const allIds = new Set([
328 ...Object.keys(currentA),
329 ...Object.keys(currentB),
330 ]);
331
332 /** @type {Record<string, string>} */
333 const current = {};
334
335 /** @type {T[]} */
336 const data = [];
337
338 // Construct `current` and `data`
339 /** @type {Promise<void>[]} */
340 const cidPromises = [];
341
342 for (const id of allIds) {
343 if (allRemoved.has(id)) continue;
344
345 if (id in currentA && id in currentB) {
346 const itemA = mapA.get(id);
347 const itemB = mapB.get(id);
348
349 if (!itemA || !itemB) {
350 console.warn("Should have found both items but didn't!");
351 continue;
352 }
353
354 // Items are identical, no merge or CID recomputation needed
355 if (currentA[id] === currentB[id]) {
356 data.push(itemA);
357 current[id] = currentA[id];
358 continue;
359 }
360
361 const isANewerThanB = itemA.updatedAt && itemB.updatedAt
362 ? compareTimestamps(itemA.updatedAt, itemB.updatedAt) > 0
363 : false;
364
365 const newestItem = isANewerThanB ? itemA : itemB;
366 const oldItem = isANewerThanB ? itemB : itemA;
367
368 /** @type {T} */
369 const mergedItem = { ...oldItem };
370
371 deepDiff.applyDiff(mergedItem, newestItem);
372
373 data.push(mergedItem);
374
375 cidPromises.push(
376 CID.create(0x71, encode(mergedItem)).then((cid) => {
377 current[id] = cid;
378 }),
379 );
380 } else {
381 const item = mapA.get(id) ?? mapB.get(id);
382
383 if (item) {
384 data.push(item);
385 current[id] = currentA[id] ?? currentB[id];
386 }
387 }
388 }
389
390 await Promise.all(cidPromises);
391
392 // New inventory
393 const updatedInventory = { current, removed: Array.from(allRemoved) };
394
395 return {
396 cid: await CID.create(0x71, encode(updatedInventory)),
397 data,
398 inventory: updatedInventory,
399 };
400 }
401
402 /**
403 * @template {{ id: string; updatedAt: string }} T
404 * @param {Container<T>} container
405 * @returns {Uint8Array}
406 */
407 save(container) {
408 return encode(container);
409 }
410
411 // OUTPUT MANAGER FUNCTIONS
412
413 /**
414 * @template {{ id: string; updatedAt: string }} T
415 * @param {{ save: (bytes: Uint8Array) => Promise<void> | void }} local
416 * @param {{ collection: SignalReader<{ state: "loading" } | { state: "loaded"; data: Uint8Array | undefined }>, reload: () => Promise<void>, save: (bytes: Uint8Array) => Promise<void> }} remote
417 * @param {SignalReader<boolean>} remoteReady
418 * @param {SignalReader<Container<T>>} container
419 * @returns {{ collection: SignalReader<{ state: "loading" } | { state: "loaded"; data: T[] }>, reload: () => Promise<void>, save: (items: T[]) => Promise<void> }}
420 */
421 managerProp(local, remote, remoteReady, container) {
422 return {
423 collection: computed(() => {
424 const c = container();
425
426 if (c.cid === undefined && remoteReady() && remote.collection().state === "loading") {
427 return { state: "loading" };
428 }
429
430 return { state: "loaded", data: c.data };
431 }),
432 reload: remote.reload,
433 save: async (/** @type {T[]} */ newItems) => {
434 const adjustedContainer = await this.updateContainer({
435 collection: newItems,
436 previous: container(),
437 });
438
439 const bytes = this.save(adjustedContainer);
440 await local.save(bytes);
441 },
442 };
443 }
444
445 // RENDER
446
447 /**
448 * @param {RenderArg} _
449 */
450 render({ html }) {
451 return html`
452 <dop-indexed-db
453 group="${ifDefined(this.getAttribute(`group`))}"
454 namespace="${ifDefined(this.getAttribute(`namespace`))}"
455 ></dop-indexed-db>
456 `;
457 }
458}
459
460export default DaslBytesSyncOutputTransformer;
461
462////////////////////////////////////////////
463// REGISTER
464////////////////////////////////////////////
465
466export const CLASS = DaslBytesSyncOutputTransformer;
467export const NAME = "dtob-dasl-sync";
468
469defineElement(NAME, CLASS);