A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

chore(atproto): back to listing records regularly

+19 -54
+19 -54
src/components/output/raw/atproto/element.js
··· 2 2 import { ComAtprotoSyncSubscribeRepos } from "@atcute/atproto"; 3 3 import { decode, encode } from "@atcute/cbor"; 4 4 import { xxh32r } from "xxh32/dist/raw.js"; 5 - import * as Repo from "@atcute/repo"; 6 5 import * as IDB from "idb-keyval"; 7 6 8 7 import { computed, signal } from "~/common/signal.js"; ··· 396 395 } 397 396 398 397 /** 399 - * Fetch the full repo CAR for the authenticated user, cached in IDB by rev. 400 - * Returns null if not authenticated or if the commit rev cannot be determined. 401 - * 402 - * @returns {Promise<Uint8Array | null>} 403 - */ 404 - async #getRepoCar() { 405 - const did = this.#did.value; 406 - const rpc = this.#rpc; 407 - if (!rpc || !did) return null; 408 - 409 - const REV_TTL_MS = 5_000; 410 - const latestRev = 411 - (Date.now() - this.#revFetchedAt < REV_TTL_MS && this.#rev.value) 412 - ? this.#rev.value 413 - : await this.getLatestCommit(); 414 - if (!latestRev) return null; 415 - 416 - const IDB_KEY = `diffuse/output/raw/atproto/repo/${did}`; 417 - const cached = 418 - /** @type {{ rev: string, bytes: Uint8Array } | undefined} */ ( 419 - await IDB.get(IDB_KEY) 420 - ); 421 - 422 - if (cached?.rev === latestRev) { 423 - return cached.bytes; 424 - } 425 - 426 - const bytes = await ok(rpc.get("com.atproto.sync.getRepo", { 427 - params: { did }, 428 - as: "bytes", 429 - })); 430 - 431 - await IDB.set(IDB_KEY, { rev: latestRev, bytes }); 432 - return bytes; 433 - } 434 - 435 - /** 436 398 * @param {Uint8Array} bytes 437 399 * @returns {Promise<any>} 438 400 */ ··· 472 434 if (!this.#rpc || !did) return []; 473 435 474 436 try { 475 - const bytes = await this.#getRepoCar(); 476 - if (!bytes) return []; 477 - 478 437 const records = []; 479 - for (const entry of Repo.fromUint8Array(bytes)) { 480 - if (entry.collection === collection) { 481 - records.push(/** @type {T} */ (entry.record)); 482 - } 483 - } 438 + let cursor; 439 + do { 440 + const page = await ok(this.#rpc.get("com.atproto.repo.listRecords", { 441 + params: { repo: did, collection, limit: 100, cursor }, 442 + })); 443 + records.push(...page.records.map((r) => /** @type {T} */ (r.value))); 444 + cursor = page.cursor; 445 + } while (cursor); 484 446 return records; 485 447 } catch (err) { 486 448 if (this.#isSessionError(err)) { ··· 600 562 /** @type {Map<string, { rkey: string, value: unknown }>} */ 601 563 const existing = new Map(); 602 564 603 - const repoBytes = await this.#getRepoCar(); 604 - if (repoBytes) { 605 - for (const entry of Repo.fromUint8Array(repoBytes)) { 606 - if (entry.collection === collection) { 607 - const record = /** @type {any} */ (entry.record); 608 - existing.set(record.id, { rkey: entry.rkey, value: record }); 609 - } 565 + let cursor; 566 + do { 567 + const page = await ok(rpc.get("com.atproto.repo.listRecords", { 568 + params: { repo: did, collection, limit: 100, cursor }, 569 + })); 570 + for (const { uri, value } of page.records) { 571 + const record = /** @type {any} */ (value); 572 + const rkey = uri.split("/").at(-1); 573 + existing.set(record.id, { rkey, value: record }); 610 574 } 611 - } 575 + cursor = page.cursor; 576 + } while (cursor); 612 577 613 578 // 2. Build desired state 614 579 const desired = new Map(