a collection of lightweight TypeScript packages for AT Protocol, the protocol powering Bluesky
atproto bluesky typescript npm
101
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(car): streaming reader fails when two records share the same CID

fixes https://github.com/mary-ext/atcute/issues/68

Mary 2287359a 041f5ef1

+140 -21
+5
.changeset/ninety-boats-design.md
··· 1 + --- 2 + '@atcute/repo': patch 3 + --- 4 + 5 + fix streaming reader losing records when multiple MST entries reference the same CID
+96 -1
packages/utilities/repo/lib/index.test.ts
··· 1 - import { fromCidLink, toString } from '@atcute/cid'; 1 + import { serializeCarEntry, serializeCarHeader } from '@atcute/car'; 2 + import * as CBOR from '@atcute/cbor'; 3 + import { toBytes } from '@atcute/cbor'; 4 + import * as CID from '@atcute/cid'; 5 + import { fromCidLink, toCidLink, toString } from '@atcute/cid'; 6 + import { MSTNode } from '@atcute/mst'; 2 7 import { fromBase64 } from '@atcute/multibase'; 8 + import { concat } from '@atcute/uint8array'; 3 9 4 10 import { describe, expect, it } from 'vitest'; 5 11 6 12 import { fromStream, fromUint8Array, repoEntryTransform } from './index.ts'; 13 + import type { Commit } from './types.ts'; 14 + 15 + /** 16 + * builds a minimal CAR file containing a commit, single MST node, and one 17 + * record block. the MST node has two entries (different keys) pointing to 18 + * the same record CID. 19 + */ 20 + const buildDuplicateCidCar = async (): Promise<{ 21 + car: Uint8Array; 22 + recordCid: string; 23 + record: unknown; 24 + keys: [string, string]; 25 + }> => { 26 + const record = { $type: 'app.bsky.feed.post', text: 'hello', createdAt: '2025-01-01T00:00:00.000Z' }; 27 + const recordBytes = CBOR.encode(record); 28 + const recordCid = await CID.create(0x71, recordBytes); 29 + const recordLink = toCidLink(recordCid); 30 + 31 + // two different keys pointing to the same record CID 32 + const keys = ['app.bsky.feed.post/aaaa', 'app.bsky.feed.post/aaab'] as [string, string]; 33 + const node = await MSTNode.create(keys, [recordLink, recordLink], [null, null, null]); 34 + const nodeBytes = await node.serialize(); 35 + const nodeCid = fromCidLink(await node.cid()); 36 + 37 + const commit: Commit = { 38 + version: 3, 39 + did: 'did:plc:test', 40 + data: await node.cid(), 41 + rev: '1', 42 + prev: null, 43 + sig: toBytes(new Uint8Array(64)), 44 + }; 45 + const commitBytes = CBOR.encode(commit); 46 + const commitCid = await CID.create(0x71, commitBytes); 47 + const commitLink = toCidLink(commitCid); 48 + 49 + const chunks = [ 50 + serializeCarHeader([commitLink]), 51 + serializeCarEntry(commitCid.bytes, commitBytes), 52 + serializeCarEntry(nodeCid.bytes, nodeBytes), 53 + serializeCarEntry(recordCid.bytes, recordBytes), 54 + ]; 55 + 56 + return { 57 + car: concat(chunks), 58 + recordCid: toString(recordCid), 59 + record, 60 + keys, 61 + }; 62 + }; 7 63 8 64 describe('fromUint8Array', () => { 9 65 it('decodes atproto car files', () => { ··· 233 289 ]); 234 290 }); 235 291 }); 292 + 293 + describe('duplicate CID handling', () => { 294 + it('fromUint8Array yields both records when two keys share a CID', async () => { 295 + const { car, recordCid, record } = await buildDuplicateCidCar(); 296 + 297 + const result = Array.from(fromUint8Array(car), (entry) => ({ 298 + collection: entry.collection, 299 + rkey: entry.rkey, 300 + cid: toString(fromCidLink(entry.cid)), 301 + record: entry.record, 302 + })); 303 + 304 + expect(result).toEqual([ 305 + { collection: 'app.bsky.feed.post', rkey: 'aaaa', cid: recordCid, record }, 306 + { collection: 'app.bsky.feed.post', rkey: 'aaab', cid: recordCid, record }, 307 + ]); 308 + }); 309 + 310 + it('fromStream yields both records when two keys share a CID', async () => { 311 + const { car, recordCid, record } = await buildDuplicateCidCar(); 312 + 313 + const blob = new Blob([car]); 314 + await using repo = fromStream(blob.stream()); 315 + 316 + const result = await Array.fromAsync(repo, (entry) => ({ 317 + collection: entry.collection, 318 + rkey: entry.rkey, 319 + cid: toString(fromCidLink(entry.cid)), 320 + record: entry.record, 321 + })); 322 + 323 + expect(repo.missingBlocks).toEqual([]); 324 + 325 + expect(result).toEqual([ 326 + { collection: 'app.bsky.feed.post', rkey: 'aaaa', cid: recordCid, record }, 327 + { collection: 'app.bsky.feed.post', rkey: 'aaab', cid: recordCid, record }, 328 + ]); 329 + }); 330 + });
+39 -20
packages/utilities/repo/lib/streamed-reader.ts
··· 34 34 }; 35 35 36 36 export interface StreamedRepoReader { 37 - /** 38 - * list of blocks that were referenced but not found in the repository. 39 - * blocks may be reported as missing if multiple records share the same CID. 40 - */ 37 + /** list of blocks that were referenced but not found in the repository */ 41 38 readonly missingBlocks: readonly MissingBlockEntry[]; 42 39 43 40 dispose(): Promise<void>; ··· 97 94 const car = CAR.fromStream(stream); 98 95 99 96 try { 100 - const pending = new Map<string, EntryMeta>(); 97 + const pending = new Map<string, EntryMeta[]>(); 101 98 const strays = new Map<string, CarEntry>(); 102 99 103 100 const queue = new Queue<Task>(); ··· 109 106 strays.delete(cid); 110 107 queue.enqueue({ c: cid, e: entry, m: meta }); 111 108 } else { 112 - pending.set(cid, meta); 109 + const metas = pending.get(cid); 110 + 111 + if (metas !== undefined) { 112 + metas.push(meta); 113 + } else { 114 + pending.set(cid, [meta]); 115 + } 113 116 } 114 117 }; 115 118 ··· 125 128 const cid = CID.toString(entry.cid); 126 129 127 130 { 128 - const meta = pending.get(cid); 131 + const metas = pending.get(cid); 129 132 130 - if (meta !== undefined) { 133 + if (metas !== undefined) { 131 134 pending.delete(cid); 132 - queue.enqueue({ c: cid, e: entry, m: meta }); 135 + 136 + for (let i = 0, il = metas.length; i < il; i++) { 137 + queue.enqueue({ c: cid, e: entry, m: metas[i] }); 138 + } 133 139 } else { 134 140 strays.set(cid, entry); 135 141 } ··· 188 194 } 189 195 } 190 196 191 - missingBlocks = Array.from(pending, ([cid, meta]): MissingBlockEntry => { 192 - switch (meta.t) { 193 - case 0: { 194 - return { cid, type: 'commit' }; 195 - } 196 - case 1: { 197 - return { cid, type: 'mst-node' }; 198 - } 199 - case 2: { 200 - return { cid, type: 'record', key: meta.k }; 197 + { 198 + const missing: MissingBlockEntry[] = []; 199 + 200 + for (const [cid, metas] of pending) { 201 + for (let i = 0, il = metas.length; i < il; i++) { 202 + const meta = metas[i]; 203 + 204 + switch (meta.t) { 205 + case 0: { 206 + missing.push({ cid, type: 'commit' }); 207 + break; 208 + } 209 + case 1: { 210 + missing.push({ cid, type: 'mst-node' }); 211 + break; 212 + } 213 + case 2: { 214 + missing.push({ cid, type: 'record', key: meta.k }); 215 + break; 216 + } 217 + } 201 218 } 202 219 } 203 - }); 220 + 221 + missingBlocks = missing; 222 + } 204 223 } finally { 205 224 await car.dispose(); 206 225 }