A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v4 469 lines 13 kB view raw
1import { decode, encode } from "@atcute/cbor"; 2import { ifDefined } from "lit-html/directives/if-defined.js"; 3import deepDiff from "@fry69/deep-diff"; 4 5import "~/components/output/polymorphic/indexed-db/element.js"; 6 7import * as CID from "~/common/cid.js"; 8import { diff, strictEquality } from "~/common/compare.js"; 9import { computed, signal } from "~/common/signal.js"; 10import { compareTimestamps } from "~/common/temporal.js"; 11import { OutputTransformer } from "../../base.js"; 12import { defineElement } from "~/common/element.js"; 13 14/** 15 * @import { SignalReader } from "~/common/signal.d.ts"; 16 * @import { RenderArg } from "~/common/element.d.ts" 17 * @import { OutputElement } from "~/components/output/types.d.ts" 18 * 19 * @import { Container } from "./types.d.ts" 20 */ 21 22/** @type {Container<any>} */ 23const EMPTY = { 24 cid: undefined, 25 data: [], 26 inventory: { current: {}, removed: [] }, 27}; 28 29/** 30 * @extends {OutputTransformer<Uint8Array>} 31 */ 32class DaslBytesSyncOutputTransformer extends OutputTransformer { 33 static NAME = "diffuse/transformer/output/bytes/dasl-sync"; 34 35 constructor() { 36 super(); 37 38 const remote = this.base(); 39 const local = this.#localOutput.get; 40 41 /** 42 * @template {{ id: string; updatedAt: string }} T 43 * @param {string} kind 44 * @param {SignalReader<{ state: "loading" } | { state: "loaded"; data: Uint8Array | undefined }>} localCollection 45 * @param {SignalReader<{ state: "loading" } | { state: "loaded"; data: Uint8Array | undefined }>} remoteCollection 46 * @param {{ saveLocal: (bytes: Uint8Array) => Promise<void>; saveRemote: (bytes: Uint8Array) => Promise<void> }} sync 47 */ 48 const state = ( 49 kind, 50 localCollection, 51 remoteCollection, 52 { saveLocal, saveRemote }, 53 ) => { 54 const container = signal( 55 /** @type {Container<T>} */ (EMPTY), 56 { compare: strictEquality }, 57 ); 58 59 const isReady = signal(false); 60 const merging = signal({ isBusy: false, lastCID: "" }, { 61 compare: diff, 62 }); 63 64 this.effect(() => { 65 if (!isReady.value) return; 66 if (merging.value.isBusy) return; 67 68 const lc = localCollection(); 69 const rc = remote.ready() ? remoteCollection() : undefined; 70 71 const lb = lc?.state === "loaded" ? lc.data : undefined; 72 const rb = rc?.state === "loaded" ? rc.data : undefined; 73 const rs = rc?.state; 74 75 /** @type {Container<T> | undefined} */ 76 const l = lb ? decode(lb) : undefined; 77 78 /** @type {Container<T> | undefined} */ 79 const r = rb && rs === "loaded" ? decode(rb) : undefined; 80 81 if (!r) { 82 if (l) { 83 container.value = l; 84 85 if (remote.ready() && rs === "loaded") { 86 this.isLeader().then((isLeader) => { 87 if (!isLeader) return; 88 const bytes = this.save(l); 89 saveRemote(bytes); 90 }); 91 } 92 } 93 } else if (!l) { 94 container.value = r; 95 96 this.isLeader().then((isLeader) => { 97 if (!isLeader) return; 98 const bytes = this.save(r); 99 saveLocal(bytes); 100 }); 101 } else if ( 102 rs === "loaded" && this.hasDiverged({ local: l, remote: r }) 103 ) { 104 // Async merge 105 this.isLeader().then((isLeader) => { 106 if (!isLeader) return; 107 108 merging.value = { isBusy: true, lastCID: merging.value.lastCID }; 109 110 this.merge(l, r).then(async (c) => { 111 try { 112 container.value = c; 113 114 if (c.cid === merging.value.lastCID) return; 115 116 const bytes = this.save(c); 117 118 if (c.cid !== l.cid) { 119 await saveLocal(bytes); 120 } 121 122 if (remote.ready() && rs === "loaded" && c.cid !== r.cid) { 123 await saveRemote(bytes); 124 } 125 } finally { 126 merging.value = { isBusy: false, lastCID: c.cid ?? "" }; 127 } 128 }); 129 }); 130 } else { 131 container.value = l; 132 } 133 }); 134 135 return computed(() => { 136 if (!isReady.get()) isReady.value = true; 137 return container.get(); 138 }); 139 }; 140 141 // Container signals 142 const facets = state( 143 "facets", 144 computed(() => local()?.facets.collection() ?? { state: "loading" }), 145 remote.facets.collection, 146 { 147 saveLocal: async (v) => local()?.facets.save(v), 148 saveRemote: remote.facets.save, 149 }, 150 ); 151 152 const playlistItems = state( 153 "playlistItems", 154 computed(() => 155 local()?.playlistItems.collection() ?? { state: "loading" } 156 ), 157 remote.playlistItems.collection, 158 { 159 saveLocal: async (v) => local()?.playlistItems.save(v), 160 saveRemote: remote.playlistItems.save, 161 }, 162 ); 163 164 const settings = state( 165 "settings", 166 computed(() => local()?.settings.collection() ?? { state: "loading" }), 167 remote.settings.collection, 168 { 169 saveLocal: async (v) => local()?.settings.save(v), 170 saveRemote: remote.settings.save, 171 }, 172 ); 173 174 const tracks = state( 175 "tracks", 176 computed(() => local()?.tracks.collection() ?? { state: "loading" }), 177 remote.tracks.collection, 178 { 179 saveLocal: async (v) => local()?.tracks.save(v), 180 saveRemote: remote.tracks.save, 181 }, 182 ); 183 184 // Output manager 185 this.facets = this.managerProp( 186 { save: async (v) => local()?.facets.save(v) }, 187 remote.facets, 188 remote.ready, 189 facets, 190 ); 191 192 this.playlistItems = this.managerProp( 193 { save: async (v) => local()?.playlistItems.save(v) }, 194 remote.playlistItems, 195 remote.ready, 196 playlistItems, 197 ); 198 199 this.settings = this.managerProp( 200 { save: async (v) => local()?.settings.save(v) }, 201 remote.settings, 202 remote.ready, 203 settings, 204 ); 205 206 this.tracks = this.managerProp( 207 { save: async (v) => local()?.tracks.save(v) }, 208 remote.tracks, 209 remote.ready, 210 tracks, 211 ); 212 213 this.ready = () => true; 214 } 215 216 // SIGNALS 217 218 #localOutput = signal( 219 /** @type {OutputElement<any> | undefined} */ (undefined), 220 ); 221 222 // LIFECYCLE 223 224 /** 225 * @override 226 */ 227 async connectedCallback() { 228 // Broadcast if needed 229 if (this.hasAttribute("group")) { 230 this.broadcast(this.identifier, {}); 231 } 232 233 super.connectedCallback(); 234 235 /** @type {OutputElement<any> | null} */ 236 const local = this.root().querySelector("dop-indexed-db"); 237 if (!local) throw new Error("Can't find local output"); 238 239 customElements.whenDefined(local.localName).then(() => { 240 this.#localOutput.value = local; 241 }); 242 } 243 244 // DATA FUNCTIONS 245 246 /** 247 * @template {{ id: string; updatedAt: string }} T 248 * @param {{ previous: Container<T>, collection: T[] }} _ 249 * @returns {Promise<Container<T>>} 250 */ 251 async updateContainer({ previous, collection }) { 252 const inventory = previous.inventory; 253 254 const collIds = collection.map(({ id }) => id); 255 256 const currSet = new Set(Object.keys(inventory.current)); 257 const collSet = new Set(collIds); 258 259 const newSet = collSet.difference(currSet); 260 const remSet = currSet.difference(collSet); 261 262 const alreadyRemoved = new Set(inventory.removed); 263 const allRemoved = alreadyRemoved.union(remSet); 264 265 /** @type {Record<string, string>} */ 266 const current = { ...inventory.current }; 267 268 remSet.forEach((id) => { 269 delete current[id]; 270 }); 271 272 /** @type Promise<void>[] */ 273 const promises = []; 274 275 collection.forEach((a) => { 276 if (!newSet.has(a.id)) return; 277 278 // Item is new, calculate CID and add it to the `current` dictionary 279 const encoded = encode(a); 280 281 promises.push((async () => { 282 const cid = await CID.create(0x71, encoded); 283 current[a.id] = cid; 284 })()); 285 }); 286 287 await Promise.all(promises); 288 289 const newInventory = { 290 current, 291 removed: Array.from(allRemoved), 292 }; 293 294 return { 295 cid: await CID.create(0x71, encode(newInventory)), 296 data: collection, 297 inventory: newInventory, 298 }; 299 } 300 301 /** 302 * @template {{ id: string; updatedAt: string }} T 303 * @param {{ local: Container<T>, remote: Container<T> }} _ 304 */ 305 hasDiverged({ local, remote }) { 306 return local.cid !== remote.cid; 307 } 308 309 /** 310 * @template {{ id: string; updatedAt: string }} T 311 * @param {Container<T>} a 312 * @param {Container<T>} b 313 * @returns {Promise<Container<T>>} 314 */ 315 async merge(a, b) { 316 const removedA = new Set(a.inventory.removed); 317 const removedB = new Set(b.inventory.removed); 318 const allRemoved = removedA.union(removedB); 319 320 const currentA = a.inventory.current; 321 const currentB = b.inventory.current; 322 323 const mapA = new Map(a.data.map((item) => [item.id, item])); 324 const mapB = new Map(b.data.map((item) => [item.id, item])); 325 326 // Combine all known ids from both sides 327 const allIds = new Set([ 328 ...Object.keys(currentA), 329 ...Object.keys(currentB), 330 ]); 331 332 /** @type {Record<string, string>} */ 333 const current = {}; 334 335 /** @type {T[]} */ 336 const data = []; 337 338 // Construct `current` and `data` 339 /** @type {Promise<void>[]} */ 340 const cidPromises = []; 341 342 for (const id of allIds) { 343 if (allRemoved.has(id)) continue; 344 345 if (id in currentA && id in currentB) { 346 const itemA = mapA.get(id); 347 const itemB = mapB.get(id); 348 349 if (!itemA || !itemB) { 350 console.warn("Should have found both items but didn't!"); 351 continue; 352 } 353 354 // Items are identical, no merge or CID recomputation needed 355 if (currentA[id] === currentB[id]) { 356 data.push(itemA); 357 current[id] = currentA[id]; 358 continue; 359 } 360 361 const isANewerThanB = itemA.updatedAt && itemB.updatedAt 362 ? compareTimestamps(itemA.updatedAt, itemB.updatedAt) > 0 363 : false; 364 365 const newestItem = isANewerThanB ? itemA : itemB; 366 const oldItem = isANewerThanB ? itemB : itemA; 367 368 /** @type {T} */ 369 const mergedItem = { ...oldItem }; 370 371 deepDiff.applyDiff(mergedItem, newestItem); 372 373 data.push(mergedItem); 374 375 cidPromises.push( 376 CID.create(0x71, encode(mergedItem)).then((cid) => { 377 current[id] = cid; 378 }), 379 ); 380 } else { 381 const item = mapA.get(id) ?? mapB.get(id); 382 383 if (item) { 384 data.push(item); 385 current[id] = currentA[id] ?? currentB[id]; 386 } 387 } 388 } 389 390 await Promise.all(cidPromises); 391 392 // New inventory 393 const updatedInventory = { current, removed: Array.from(allRemoved) }; 394 395 return { 396 cid: await CID.create(0x71, encode(updatedInventory)), 397 data, 398 inventory: updatedInventory, 399 }; 400 } 401 402 /** 403 * @template {{ id: string; updatedAt: string }} T 404 * @param {Container<T>} container 405 * @returns {Uint8Array} 406 */ 407 save(container) { 408 return encode(container); 409 } 410 411 // OUTPUT MANAGER FUNCTIONS 412 413 /** 414 * @template {{ id: string; updatedAt: string }} T 415 * @param {{ save: (bytes: Uint8Array) => Promise<void> | void }} local 416 * @param {{ collection: SignalReader<{ state: "loading" } | { state: "loaded"; data: Uint8Array | undefined }>, reload: () => Promise<void>, save: (bytes: Uint8Array) => Promise<void> }} remote 417 * @param {SignalReader<boolean>} remoteReady 418 * @param {SignalReader<Container<T>>} container 419 * @returns {{ collection: SignalReader<{ state: "loading" } | { state: "loaded"; data: T[] }>, reload: () => Promise<void>, save: (items: T[]) => Promise<void> }} 420 */ 421 managerProp(local, remote, remoteReady, container) { 422 return { 423 collection: computed(() => { 424 const c = container(); 425 426 if (c.cid === undefined && remoteReady() && remote.collection().state === "loading") { 427 return { state: "loading" }; 428 } 429 430 return { state: "loaded", data: c.data }; 431 }), 432 reload: remote.reload, 433 save: async (/** @type {T[]} */ newItems) => { 434 const adjustedContainer = await this.updateContainer({ 435 collection: newItems, 436 previous: container(), 437 }); 438 439 const bytes = this.save(adjustedContainer); 440 await local.save(bytes); 441 }, 442 }; 443 } 444 445 // RENDER 446 447 /** 448 * @param {RenderArg} _ 449 */ 450 render({ html }) { 451 return html` 452 <dop-indexed-db 453 group="${ifDefined(this.getAttribute(`group`))}" 454 namespace="${ifDefined(this.getAttribute(`namespace`))}" 455 ></dop-indexed-db> 456 `; 457 } 458} 459 460export default DaslBytesSyncOutputTransformer; 461 462//////////////////////////////////////////// 463// REGISTER 464//////////////////////////////////////////// 465 466export const CLASS = DaslBytesSyncOutputTransformer; 467export const NAME = "dtob-dasl-sync"; 468 469defineElement(NAME, CLASS);