A music player that connects to your cloud/distributed storage.
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

chore: atproto sync improvements

+113 -56
+100 -48
src/components/output/raw/atproto/element.js
··· 27 27 // ELEMENT 28 28 //////////////////////////////////////////// 29 29 30 + const WRITE_WINDOW_MS = 3_600_000; 31 + const WRITE_RATE_LIMIT = 1500; 32 + const WRITE_IDB_KEY = "diffuse/output/raw/atproto/writes"; 33 + 30 34 /** @type {Set<string>} */ 31 35 const WATCHED_COLLECTIONS = new Set([ 32 36 "sh.diffuse.output.facet", ··· 129 133 #rev = signal(/** @type {string | null} */ (null)); 130 134 #revFetchedAt = 0; 131 135 #writing = 0; 136 + 137 + /** @type {Array<{ fn: () => Promise<void>, resolve: () => void, reject: (err: unknown) => void }>} */ 138 + #writeQueue = []; 139 + #writeDraining = false; 140 + /** @type {Map<string, { cancelled: boolean }>} */ 141 + #writeCancels = new Map(); 132 142 133 143 did = this.#did.get; 134 144 rev = this.#rev.get; ··· 439 449 } 440 450 } 441 451 452 + // WRITE QUEUE 453 + 454 + /** @returns {Promise<{ id: string, ts: number }[]>} */ 455 + async #loadWriteWindow() { 456 + const now = Date.now(); 457 + const all = /** @type {{ id: string, ts: number }[]} */ ( 458 + await IDB.get(WRITE_IDB_KEY) ?? [] 459 + ); 460 + return all.filter((e) => now - e.ts < WRITE_WINDOW_MS); 461 + } 462 + 463 + /** @param {string[]} ids */ 464 + async #recordWritten(ids) { 465 + const now = Date.now(); 466 + const window = await this.#loadWriteWindow(); 467 + await IDB.set(WRITE_IDB_KEY, [ 468 + ...window, 469 + ...ids.map((id) => ({ id, ts: now })), 470 + ]); 471 + } 472 + 473 + /** 474 + * @param {() => Promise<void>} fn 475 + * @returns {Promise<void>} 476 + */ 477 + #enqueueWrite(fn) { 478 + return new Promise((resolve, reject) => { 479 + this.#writeQueue.push({ fn, resolve, reject }); 480 + this.#drainWrites(); 481 + }); 482 + } 483 + 484 + async #drainWrites() { 485 + if (this.#writeDraining) return; 486 + this.#writeDraining = true; 487 + 488 + while (this.#writeQueue.length > 0) { 489 + const { fn, resolve, reject } = /** @type {NonNullable<typeof this.#writeQueue[0]>} */ ( 490 + this.#writeQueue.shift() 491 + ); 492 + try { 493 + await fn(); 494 + resolve(); 495 + } catch (err) { 496 + reject(err); 497 + } 498 + } 499 + 500 + this.#writeDraining = false; 501 + } 502 + 442 503 /** 443 504 * @param {string} collection 444 505 * @param {Array<{ id: string }>} data ··· 449 510 data, 450 511 { deleteBatchSize = 100, upsertBatchSize = deleteBatchSize } = {}, 451 512 ) { 513 + if (!this.#rpc || !this.#did.value) return; 514 + 515 + // Supersede any prior write for this collection 516 + const prior = this.#writeCancels.get(collection); 517 + if (prior) prior.cancelled = true; 518 + 519 + const token = { cancelled: false }; 520 + this.#writeCancels.set(collection, token); 521 + 522 + return this.#enqueueWrite(async () => { 523 + if (token.cancelled) return; 524 + try { 525 + await this.#doPutRecords(collection, data, { deleteBatchSize, upsertBatchSize }, token); 526 + } finally { 527 + if (this.#writeCancels.get(collection) === token) { 528 + this.#writeCancels.delete(collection); 529 + } 530 + } 531 + }); 532 + } 533 + 534 + /** 535 + * @param {string} collection 536 + * @param {Array<{ id: string }>} data 537 + * @param {{ deleteBatchSize: number, upsertBatchSize: number }} options 538 + * @param {{ cancelled: boolean }} token 539 + */ 540 + async #doPutRecords(collection, data, { deleteBatchSize, upsertBatchSize }, token) { 452 541 const rpc = this.#rpc; 453 542 const did = this.#did.value; 454 - 455 543 if (!rpc || !did) return; 456 544 457 545 this.#writing++; ··· 512 600 } 513 601 } 514 602 515 - // 4. Apply in batches, throttled to 1500 ops/hour via a persisted sliding window 516 - const WINDOW_MS = 3_600_000; 517 - const RATE_LIMIT = 1500; 518 - 519 - const IDB_KEY = "diffuse/output/raw/atproto/writes"; 520 - 521 - /** 522 - * Returns all record IDs written within the last hour, loaded from IDB. 523 - * 524 - * @returns {Promise<{ id: string, ts: number }[]>} 525 - */ 526 - const loadWindow = async () => { 527 - const now = Date.now(); 528 - const all = await IDB.get(IDB_KEY) ?? []; 529 - return all.filter( 530 - /** 531 - * @param {{ id: string, ts: number }} entry 532 - * @returns {boolean} 533 - */ 534 - (entry) => now - entry.ts < WINDOW_MS, 535 - ); 536 - }; 537 - 538 - /** 539 - * Records IDs as written, pruning entries older than one hour. 540 - * 541 - * @param {string[]} ids 542 - */ 543 - const recordWritten = async (ids) => { 544 - const now = Date.now(); 545 - const window = await loadWindow(); 546 - await IDB.set(IDB_KEY, [ 547 - ...window, 548 - ...ids.map((id) => ({ id, ts: now })), 549 - ]); 550 - }; 551 - 603 + // 4. Apply batches, throttled to WRITE_RATE_LIMIT ops/hour. 604 + // The write queue ensures we are the only writer, so one precise sleep 605 + // is enough — no need to re-check in a loop. 552 606 const applyBatch = async (/** @type {any[]} */ batch) => { 553 - // Wait until the sliding window has room for this batch 554 - while (true) { 555 - const window = await loadWindow(); 607 + const window = await this.#loadWriteWindow(); 556 608 557 - if (window.length + batch.length <= RATE_LIMIT) break; 558 - 559 - // Wait until the oldest entry in the window expires 560 - const oldest = window.reduce((a, b) => a.ts < b.ts ? a : b); 561 - const waitMs = WINDOW_MS - (Date.now() - oldest.ts) + 1; 609 + if (window.length + batch.length > WRITE_RATE_LIMIT) { 610 + const needed = window.length + batch.length - WRITE_RATE_LIMIT; 611 + const sorted = [...window].sort((a, b) => a.ts - b.ts); 612 + const waitMs = WRITE_WINDOW_MS - (Date.now() - sorted[needed - 1].ts) + 1; 562 613 await new Promise((resolve) => setTimeout(resolve, waitMs)); 563 614 } 564 615 ··· 569 620 const writtenIds = batch.map((op) => op.rkey ?? op.value?.id).filter( 570 621 Boolean, 571 622 ); 572 - 573 - await recordWritten(writtenIds); 623 + await this.#recordWritten(writtenIds); 574 624 575 625 if (result?.commit?.rev) { 576 626 this.#rev.value = result.commit.rev; ··· 578 628 }; 579 629 580 630 for (let i = 0; i < deletes.length; i += deleteBatchSize) { 631 + if (token.cancelled) return; 581 632 await applyBatch(deletes.slice(i, i + deleteBatchSize)); 582 633 } 583 634 584 635 for (let i = 0; i < upserts.length; i += upsertBatchSize) { 636 + if (token.cancelled) return; 585 637 await applyBatch(upserts.slice(i, i + upsertBatchSize)); 586 638 } 587 639 } catch (err) {
+13 -8
src/components/transformer/output/raw/atproto-sync/element.js
··· 153 153 await remote[name].reload(); 154 154 } 155 155 156 - const localHasData = COLLECTIONS.some((name) => { 157 - const col = l[name].collection(); 158 - return col.state === "loaded" && Array.isArray(col.data) && 159 - col.data.length > 0; 160 - }); 156 + const localCollections = await Promise.all( 157 + COLLECTIONS.map((name) => Output.data(l[name])), 158 + ); 159 + 160 + const localHasData = localCollections.some( 161 + (data) => Array.isArray(data) && data.length > 0, 162 + ); 161 163 162 164 if (!localHasData && !dirty) { 163 165 // Local is empty and clean — just pull remote ··· 197 199 this.#clearDirty(); 198 200 } catch (err) { 199 201 console.warn("Sync failed:", err); 202 + } finally { 200 203 this.#syncing = false; 201 204 } 202 205 } ··· 219 222 #mergeRecords(collection, localArr, remoteArr) { 220 223 const tombstones = this.#getTombstones(collection); 221 224 const knownIds = this.#getKnownIds(collection); 225 + const remoteIds = new Set(remoteArr.map((r) => r.id)); 222 226 223 227 /** @type {Map<string, T>} */ 224 228 const merged = new Map(); 225 229 226 230 // Start with local records 227 231 for (const record of localArr) { 228 - if (!tombstones.has(record.id)) { 229 - merged.set(record.id, record); 230 - } 232 + if (tombstones.has(record.id)) continue; 233 + // If previously synced but now absent from remote, it was deleted remotely. 234 + if (knownIds.has(record.id) && !remoteIds.has(record.id)) continue; 235 + merged.set(record.id, record); 231 236 } 232 237 233 238 // Merge remote records