A music player that connects to your cloud/distributed storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: track bundle atproto deletes should still use normal batch size

+21 -10
+19 -10
src/components/output/raw/atproto/element.js
··· 82 82 } 83 83 84 84 return this.#putRecords("sh.diffuse.output.trackBundle", bundles, { 85 - batchSize: 1, 85 + upsertBatchSize: 1, 86 86 }); 87 87 }, 88 88 }, ··· 306 306 /** 307 307 * @param {string} collection 308 308 * @param {Array<{ id: string }>} data 309 - * @param {{ batchSize?: number }} [options] 309 + * @param {{ deleteBatchSize?: number, upsertBatchSize?: number }} [options] 310 310 */ 311 - async #putRecords(collection, data, { batchSize = 100 } = {}) { 311 + async #putRecords(collection, data, { deleteBatchSize = 100, upsertBatchSize = deleteBatchSize } = {}) { 312 312 const rpc = this.#rpc; 313 313 if (!rpc || !this.#did.value) return; 314 314 ··· 343 343 344 344 // 3. Compute diff 345 345 /** @type {unknown[]} */ 346 - const writes = []; 346 + const deletes = []; 347 + 348 + /** @type {unknown[]} */ 349 + const upserts = []; 347 350 348 351 for (const [id, { rkey }] of existing) { 349 352 if (!desired.has(id)) { 350 - writes.push({ 353 + deletes.push({ 351 354 $type: "com.atproto.repo.applyWrites#delete", 352 355 collection, 353 356 rkey, ··· 359 362 const entry = existing.get(id); 360 363 361 364 if (!entry) { 362 - writes.push({ 365 + upserts.push({ 363 366 $type: "com.atproto.repo.applyWrites#create", 364 367 collection, 365 368 rkey: id, 366 369 value: record, 367 370 }); 368 371 } else if (JSON.stringify(entry.value) !== JSON.stringify(record)) { 369 - writes.push({ 372 + upserts.push({ 370 373 $type: "com.atproto.repo.applyWrites#update", 371 374 collection, 372 375 rkey: entry.rkey, ··· 376 379 } 377 380 378 381 // 4. Apply in batches 379 - for (let i = 0; i < writes.length; i += batchSize) { 380 - const batch = writes.slice(i, i + batchSize); 381 - 382 + const applyBatch = async (/** @type {unknown[]} */ batch) => { 382 383 const result = await ok(rpc.post("com.atproto.repo.applyWrites", { 383 384 input: { repo: this.#did.value, writes: batch }, 384 385 })); ··· 386 387 if (result?.commit?.rev) { 387 388 this.#rev.value = result.commit.rev; 388 389 } 390 + }; 391 + 392 + for (let i = 0; i < deletes.length; i += deleteBatchSize) { 393 + await applyBatch(deletes.slice(i, i + deleteBatchSize)); 394 + } 395 + 396 + for (let i = 0; i < upserts.length; i += upsertBatchSize) { 397 + await applyBatch(upserts.slice(i, i + upsertBatchSize)); 389 398 } 390 399 } catch (err) { 391 400 if (this.#isSessionError(err)) {
+2
src/components/transformer/output/raw/atproto-sync/element.js
··· 39 39 * @extends {OutputTransformer<null>} 40 40 */ 41 41 class ATProtoOutputSyncTransformer extends OutputTransformer { 42 + static NAME = "diffuse/transformer/output/raw/atproto-sync"; 43 + 42 44 constructor() { 43 45 super(); 44 46