A music player that connects to your cloud/distributed storage.
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: atproto local-first sync

+410 -16
+2 -2
src/common/worker/rpc-channel.js
··· 135 135 this.#port.postMessage({ __rpc: true, id, type: "response", result }); 136 136 }, 137 137 (err) => { 138 - console.error(err) 138 + console.error(err); 139 139 this.#port.postMessage({ 140 140 __rpc: true, 141 141 id, ··· 145 145 }, 146 146 ); 147 147 } catch (err) { 148 - console.error(err) 148 + console.error(err); 149 149 this.#port.postMessage({ 150 150 __rpc: true, 151 151 id,
+11 -11
src/components/orchestrator/output/element.js
··· 6 6 import "@components/output/polymorphic/indexed-db/element.js"; 7 7 import "@components/output/raw/atproto/element.js"; 8 8 import "@components/transformer/output/bytes/automerge/element.js"; 9 + import "@components/transformer/output/raw/atproto-sync/element.js"; 9 10 import "@components/transformer/output/refiner/default/element.js"; 10 11 import "@components/transformer/output/replicator/broadcast/element.js"; 11 12 import "@components/transformer/output/string/json/element.js"; ··· 103 104 const group = this.group === DEFAULT_GROUP ? undefined : this.group; 104 105 105 106 return html` 106 - <!-- IDB-ONLY #2 --> 107 107 <dop-indexed-db 108 108 id="do-output__dop-indexed-db__json" 109 109 namespace="json" 110 110 ></dop-indexed-db> 111 111 112 - <!-- S3 #2 --> 113 - <dob-s3 114 - id="do-output__dob-s3" 115 - ></dob-s3> 112 + <dob-s3 id="do-output__dob-s3"></dob-s3> 113 + <dor-atproto id="do-output__dor-atproto"></dor-atproto> 116 114 117 115 <!-- OUTPUT CONFIGURATOR --> 118 116 <dc-output ··· 120 118 default="do-output__dc-output__local" 121 119 group="${ifDefined(group)}" 122 120 > 123 - <!-- IDB-ONLY #1 --> 121 + <!-- Local --> 124 122 <dtos-json 125 123 id="do-output__dc-output__local" 126 124 output-selector="#do-output__dop-indexed-db__json" ··· 128 126 ></dtos-json> 129 127 130 128 <!-- ATProto --> 131 - <dor-atproto 129 + <dtor-atproto-sync 132 130 id="do-output__dc-output__atproto" 131 + namespace="atproto" 132 + output-selector="#do-output__dor-atproto" 133 133 label="AT Protocol" 134 - ></dor-atproto> 134 + ></dtor-atproto-sync> 135 135 136 - <!-- S3 #1 --> 136 + <!-- S3 --> 137 137 <dtob-automerge 138 138 id="do-output__dc-output__s3" 139 139 namespace="s3" ··· 142 142 ></dtob-automerge> 143 143 </dc-output> 144 144 145 - <!-- Refiner --> 145 + <!-- REFINER --> 146 146 <dtor-default 147 147 id="do-output__dtor-default" 148 148 output-selector="#do-output__dc-output" 149 149 ></dtor-default> 150 150 151 - <!-- Entry ⬆️ --> 151 + <!-- ENTRY ⬆️ --> 152 152 <dtor-broadcast 153 153 id="do-output__output" 154 154 output-selector="#do-output__dtor-default"
+38 -3
src/components/output/raw/atproto/element.js
··· 13 13 } from "./oauth.js"; 14 14 15 15 /** 16 - * @import {Signal} from "@common/signal.d.ts" 17 16 * @import {OutputManager} from "../../types.d.ts" 18 17 * @import {ATProtoOutputElement} from "./types.d.ts" 19 18 */ ··· 76 75 77 76 #did = signal(/** @type {string | null} */ (null)); 78 77 #isOnline = signal(navigator.onLine); 78 + #rev = signal(/** @type {string | null} */ (null)); 79 79 80 80 // STATE 81 81 82 82 did = this.#did.get; 83 + rev = this.#rev.get; 83 84 84 85 ready = computed(() => { 85 86 return this.#did.value !== null && this.#isOnline.value; ··· 207 208 // RECORDS 208 209 209 210 /** 211 + * Fetch the latest commit rev for this repo. 212 + * Returns `null` if not authenticated or on error. 213 + * 214 + * @returns {Promise<string | null>} 215 + */ 216 + async getLatestCommit() { 217 + const did = this.#did.value; 218 + if (!this.#rpc || !did) return null; 219 + 220 + try { 221 + /** @type {any} */ 222 + const result = await ok(this.#rpc.get( 223 + "com.atproto.sync.getLatestCommit", 224 + { params: { did } }, 225 + )); 226 + 227 + this.#rev.value = result.rev; 228 + return result.rev; 229 + } catch (err) { 230 + if (this.#isSessionError(err)) { 231 + this.#clearSession(); 232 + return null; 233 + } 234 + 235 + throw err; 236 + } 237 + } 238 + 239 + /** 210 240 * @template T 211 241 * @param {string} collection 212 242 * @param {string} [did] ··· 315 345 316 346 // 4. Apply 317 347 if (writes.length > 0) { 318 - await this.#rpc.post("com.atproto.repo.applyWrites", { 348 + /** @type {any} */ 349 + const result = await ok(this.#rpc.post("com.atproto.repo.applyWrites", { 319 350 input: { repo: this.#did.value, writes }, 320 - }); 351 + })); 352 + 353 + if (result?.commit?.rev) { 354 + this.#rev.value = result.commit.rev; 355 + } 321 356 } 322 357 } catch (err) { 323 358 if (this.#isSessionError(err)) {
+2
src/components/output/raw/atproto/types.d.ts
··· 5 5 & OutputElement 6 6 & { 7 7 did: SignalReader<string | null>; 8 + rev: SignalReader<string | null>; 9 + getLatestCommit(): Promise<string | null>; 8 10 login(handle: string): Promise<void>; 9 11 logout(): Promise<void>; 10 12 };
+357
src/components/transformer/output/raw/atproto-sync/element.js
··· 1 + import { ifDefined } from "lit-html/directives/if-defined.js"; 2 + 3 + import "@components/output/polymorphic/indexed-db/element.js"; 4 + 5 + import { computed, signal } from "@common/signal.js"; 6 + import { OutputTransformer } from "../../base.js"; 7 + 8 + /** 9 + * @import { RenderArg } from "@common/element.d.ts" 10 + * @import { OutputElement } from "@components/output/types.d.ts" 11 + * @import { ATProtoOutputElement } from "@components/output/raw/atproto/types.d.ts" 12 + */ 13 + 14 + const COLLECTIONS = /** @type {const} */ ([ 15 + "facets", 16 + "playlistItems", 17 + "themes", 18 + "tracks", 19 + ]); 20 + 21 + const STORAGE_PREFIX = "diffuse/transformer/output/atproto-sync"; 22 + 23 + /** 24 + * Wraps an AT Protocol output with a local IndexedDB cache. 25 + * 26 + * Uses the repo `rev` (revision) from the AT Protocol to skip 27 + * unnecessary fetches when nothing changed remotely. 28 + * 29 + * When both local and remote have diverged, performs a union merge 30 + * by record `id`: records from both sides are combined, with 31 + * `updatedAt` used as a tiebreaker for conflicts on the same id 32 + * (falling back to local wins). 33 + * 34 + * Maintains a per-collection tombstone set of deleted record ids 35 + * so that records deleted on one side are not re-introduced by 36 + * the other during merge. 37 + * 38 + * @extends {OutputTransformer<null>} 39 + */ 40 + class ATProtoOutputSyncTransformer extends OutputTransformer { 41 + constructor() { 42 + super(); 43 + 44 + const remote = this.base(); 45 + const local = this.#localOutput.get; 46 + 47 + for (const name of COLLECTIONS) { 48 + /** @ts-ignore */ 49 + this[name] = { 50 + collection: computed(() => { 51 + const l = local(); 52 + if (!l) return []; 53 + const data = l[name].collection(); 54 + return Array.isArray(data) ? data : []; 55 + }), 56 + reload: async () => { 57 + await this.#sync(); 58 + }, 59 + save: async (/** @type {any} */ newData) => { 60 + const l = local(); 61 + if (!l) return; 62 + 63 + // Track deletions: any id present in local but absent in 64 + // newData has been deleted by the user. 65 + const oldData = l[name].collection(); 66 + if (Array.isArray(oldData)) { 67 + const newIds = new Set(newData.map((/** @type {any} */ r) => r.id)); 68 + for (const record of oldData) { 69 + if (!newIds.has(record.id)) { 70 + this.#addTombstone(name, record.id); 71 + } 72 + } 73 + } 74 + 75 + // Update known ids 76 + this.#trackIds(name, newData); 77 + 78 + await l[name].save(newData); 79 + 80 + if (remote.ready()) { 81 + await remote[name].save(newData); 82 + const rev = this.#atproto()?.rev(); 83 + if (rev) this.#storeRev(rev); 84 + this.#clearDirty(); 85 + } else { 86 + this.#markDirty(); 87 + } 88 + }, 89 + state: computed(() => local()?.[name].state() ?? "sleeping"), 90 + }; 91 + } 92 + 93 + this.ready = () => true; 94 + 95 + // Sync when remote becomes ready 96 + this.effect(() => { 97 + const l = local(); 98 + if (!l) return; 99 + 100 + this.effect(() => { 101 + if (!remote.ready()) return; 102 + this.#sync(); 103 + }); 104 + }); 105 + } 106 + 107 + // SIGNALS 108 + 109 + #localOutput = signal( 110 + /** @type {OutputElement<any> | undefined} */ (undefined), 111 + ); 112 + 113 + #syncing = false; 114 + 115 + /** 116 + * @returns {ATProtoOutputElement | undefined} 117 + */ 118 + #atproto() { 119 + return /** @type {any} */ (this.output.signal()); 120 + } 121 + 122 + // SYNC 123 + 124 + async #sync() { 125 + if (this.#syncing) return; 126 + this.#syncing = true; 127 + 128 + try { 129 + const l = this.#localOutput.get(); 130 + const remote = this.base(); 131 + const atproto = this.#atproto(); 132 + if (!l || !atproto || !remote.ready()) return; 133 + 134 + const remoteRev = await atproto.getLatestCommit(); 135 + if (!remoteRev) return; 136 + 137 + const localRev = this.#getStoredRev(); 138 + const dirty = this.#isDirty(); 139 + 140 + if (localRev === remoteRev && !dirty) { 141 + return; 142 + } 143 + 144 + // Fetch remote data 145 + for (const name of COLLECTIONS) { 146 + await remote[name].reload(); 147 + } 148 + 149 + const localHasData = COLLECTIONS.some((name) => { 150 + const data = l[name].collection(); 151 + return Array.isArray(data) && data.length > 0; 152 + }); 153 + 154 + if (!localHasData && !dirty) { 155 + // Local is empty and clean — just pull remote 156 + for (const name of COLLECTIONS) { 157 + const remoteData = remote[name].collection(); 158 + if (Array.isArray(remoteData) && remoteData.length > 0) { 159 + this.#trackIds(name, remoteData); 160 + await l[name].save(remoteData); 161 + } 162 + } 163 + } else { 164 + // Union merge 165 + for (const name of COLLECTIONS) { 166 + const localData = l[name].collection(); 167 + const remoteData = remote[name].collection(); 168 + const localArr = Array.isArray(localData) ? localData : []; 169 + const remoteArr = Array.isArray(remoteData) ? remoteData : []; 170 + 171 + const merged = this.#mergeRecords(name, localArr, remoteArr); 172 + 173 + this.#trackIds(name, merged); 174 + await l[name].save(merged); 175 + await remote[name].save(merged); 176 + } 177 + } 178 + 179 + this.#storeRev(atproto.rev()); 180 + this.#clearDirty(); 181 + } finally { 182 + this.#syncing = false; 183 + } 184 + } 185 + 186 + /** 187 + * Union merge two record arrays by `id`. 188 + * 189 + * - Records only in local → keep (unless tombstoned) 190 + * - Records only in remote → keep (unless tombstoned) 191 + * - Records in both → pick the one with the later `updatedAt`, 192 + * falling back to local wins 193 + * - Records whose id is in the tombstone set are excluded 194 + * 195 + * @template {Record<string, any> & { id: string }} T 196 + * @param {string} collection 197 + * @param {T[]} localArr 198 + * @param {T[]} remoteArr 199 + * @returns {T[]} 200 + */ 201 + #mergeRecords(collection, localArr, remoteArr) { 202 + const tombstones = this.#getTombstones(collection); 203 + const knownIds = this.#getKnownIds(collection); 204 + 205 + /** @type {Map<string, T>} */ 206 + const merged = new Map(); 207 + 208 + // Start with local records 209 + for (const record of localArr) { 210 + if (!tombstones.has(record.id)) { 211 + merged.set(record.id, record); 212 + } 213 + } 214 + 215 + // Merge remote records 216 + for (const record of remoteArr) { 217 + if (tombstones.has(record.id)) continue; 218 + 219 + // If this id was previously known but is absent from local, 220 + // it was deleted locally — skip it. 221 + if (knownIds.has(record.id) && !merged.has(record.id)) continue; 222 + 223 + const existing = merged.get(record.id); 224 + 225 + if (!existing) { 226 + merged.set(record.id, record); 227 + } else { 228 + // Both sides have this record — pick by updatedAt 229 + const lt = existing.updatedAt; 230 + const rt = record.updatedAt; 231 + if (lt && rt && rt > lt) { 232 + merged.set(record.id, record); 233 + } 234 + } 235 + } 236 + 237 + return [...merged.values()]; 238 + } 239 + 240 + // TOMBSTONES & KNOWN IDS 241 + 242 + /** 243 + * @param {string} collection 244 + * @returns {Set<string>} 245 + */ 246 + #getTombstones(collection) { 247 + const raw = localStorage.getItem( 248 + `${STORAGE_PREFIX}/tombstones/${collection}`, 249 + ); 250 + return raw ? new Set(JSON.parse(raw)) : new Set(); 251 + } 252 + 253 + /** 254 + * @param {string} collection 255 + * @param {string} id 256 + */ 257 + #addTombstone(collection, id) { 258 + const tombstones = this.#getTombstones(collection); 259 + tombstones.add(id); 260 + localStorage.setItem( 261 + `${STORAGE_PREFIX}/tombstones/${collection}`, 262 + JSON.stringify([...tombstones]), 263 + ); 264 + } 265 + 266 + /** 267 + * @param {string} collection 268 + * @returns {Set<string>} 269 + */ 270 + #getKnownIds(collection) { 271 + const raw = localStorage.getItem(`${STORAGE_PREFIX}/known/${collection}`); 272 + return raw ? new Set(JSON.parse(raw)) : new Set(); 273 + } 274 + 275 + /** 276 + * Record all ids from a data array as known. 277 + * 278 + * @param {string} collection 279 + * @param {Array<{ id: string }>} data 280 + */ 281 + #trackIds(collection, data) { 282 + const known = this.#getKnownIds(collection); 283 + for (const record of data) { 284 + known.add(record.id); 285 + } 286 + localStorage.setItem( 287 + `${STORAGE_PREFIX}/known/${collection}`, 288 + JSON.stringify([...known]), 289 + ); 290 + } 291 + 292 + // REV & DIRTY FLAG 293 + 294 + /** @returns {string | null} */ 295 + #getStoredRev() { 296 + return localStorage.getItem(`${STORAGE_PREFIX}/rev`); 297 + } 298 + 299 + /** @param {string | null} rev */ 300 + #storeRev(rev) { 301 + if (rev) { 302 + localStorage.setItem(`${STORAGE_PREFIX}/rev`, rev); 303 + } 304 + } 305 + 306 + #markDirty() { 307 + localStorage.setItem(`${STORAGE_PREFIX}/dirty`, "1"); 308 + } 309 + 310 + #clearDirty() { 311 + localStorage.removeItem(`${STORAGE_PREFIX}/dirty`); 312 + } 313 + 314 + /** @returns {boolean} */ 315 + #isDirty() { 316 + return localStorage.getItem(`${STORAGE_PREFIX}/dirty`) === "1"; 317 + } 318 + 319 + // LIFECYCLE 320 + 321 + /** @override */ 322 + connectedCallback() { 323 + super.connectedCallback(); 324 + 325 + /** @type {OutputElement<any> | null} */ 326 + const local = this.root().querySelector("dop-indexed-db"); 327 + if (!local) throw new Error("Can't find local output"); 328 + 329 + customElements.whenDefined(local.localName).then(() => { 330 + this.#localOutput.value = local; 331 + }); 332 + } 333 + 334 + // RENDER 335 + 336 + /** 337 + * @param {RenderArg} _ 338 + */ 339 + render({ html }) { 340 + return html` 341 + <dop-indexed-db 342 + namespace="${ifDefined(this.getAttribute(`namespace`))}" 343 + ></dop-indexed-db> 344 + `; 345 + } 346 + } 347 + 348 + export default ATProtoOutputSyncTransformer; 349 + 350 + //////////////////////////////////////////// 351 + // REGISTER 352 + //////////////////////////////////////////// 353 + 354 + export const CLASS = ATProtoOutputSyncTransformer; 355 + export const NAME = "dtor-atproto-sync"; 356 + 357 + customElements.define(NAME, CLASS);