work-in-progress atproto PDS
typescript atproto pds atcute
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: make blob uploads more reliable

i hate bun

Mary 98197788 f840ace6

+83 -25
+12 -3
packages/danaus/src/actors/blob-store/disk.ts
··· 37 37 return path.join(this.directory, cid); 38 38 } 39 39 40 - async putTemp(data: Request): Promise<string> { 40 + async putTemp(stream: ReadableStream<Uint8Array>): Promise<string> { 41 41 const tempKey = nanoid(); 42 + const tempPath = this.getTempPath(tempKey); 43 + 44 + await mkdir(this.tempDirectory, { recursive: true }); 45 + 46 + const file = Bun.file(tempPath); 47 + const writer = file.writer(); 42 48 43 - const temp = Bun.file(this.getTempPath(tempKey)); 44 - await temp.write(data); 49 + for await (const chunk of stream) { 50 + writer.write(chunk); 51 + } 52 + 53 + await writer.end(); 45 54 46 55 return tempKey; 47 56 }
+8 -2
packages/danaus/src/actors/blob-store/s3.ts
··· 39 39 return `blocks/${this.did}/${cid}`; 40 40 } 41 41 42 - async putTemp(data: Request): Promise<string> { 42 + async putTemp(stream: ReadableStream<Uint8Array>): Promise<string> { 43 43 const tempKey = nanoid(); 44 44 45 45 const temp = this.client.file(this.getTempPath(tempKey)); 46 - await temp.write(data); 46 + const writer = temp.writer(); 47 + 48 + for await (const chunk of stream) { 49 + writer.write(chunk); 50 + } 51 + 52 + await writer.end(); 47 53 48 54 return tempKey; 49 55 }
+1 -1
packages/danaus/src/actors/blob-store/types.ts
··· 1 1 export interface BlobStore { 2 - putTemp(data: Request): Promise<string>; 2 + putTemp(stream: ReadableStream<Uint8Array>): Promise<string>; 3 3 putPermanent(cid: string, data: Request): Promise<void>; 4 4 5 5 makePermanent(tempKey: string, cid: string): Promise<void>;
+61 -18
packages/danaus/src/api/com.atproto/repo.uploadBlob.ts
··· 28 28 29 29 const blobStore = actorManager.resources.createBlobStore(auth.did); 30 30 31 - const [{ digest, size }, tempKey] = await Promise.all([ 32 - hashBlob(request.clone() as Request, config.service.blobs.maxUploadSize), 33 - blobStore.putTemp(request), 34 - ]); 31 + const { stream, result } = hashingStream(request.body!, config.service.blobs.maxUploadSize); 32 + 33 + const tempKey = await blobStore.putTemp(stream); 34 + 35 + const { digest, size: hashSize } = await result; 35 36 36 37 const cid = CID.toString(CID.fromDigest(CID.CODEC_RAW, digest)); 37 38 ··· 51 52 cid: cid, 52 53 created_at: new Date(), 53 54 mime_type: mimeType, 54 - size: size, 55 + size: hashSize, 55 56 temp_key: tempKey, 56 57 }) 57 58 .onConflictDoUpdate({ ··· 66 67 return { 67 68 cid: cid, 68 69 mimeType: mimeType, 69 - size: size, 70 + size: hashSize, 70 71 }; 71 72 }); 72 73 ··· 82 83 }); 83 84 }; 84 85 85 - const hashBlob = async (request: Request, maxSize: number): Promise<{ digest: Uint8Array; size: number }> => { 86 + interface HashingStreamResult { 87 + stream: ReadableStream<Uint8Array>; 88 + result: Promise<{ digest: Uint8Array; size: number }>; 89 + } 90 + 91 + /** 92 + * create a passthrough stream that hashes data as it flows through. 93 + * uses pull-based reading to work with bun's stream implementation. 94 + * @param input input stream 95 + * @param maxSize maximum allowed size 96 + * @returns passthrough stream and promise for digest and size 97 + */ 98 + const hashingStream = (input: ReadableStream<Uint8Array>, maxSize: number): HashingStreamResult => { 86 99 const hasher = createHash('sha256'); 87 100 let size = 0; 88 101 89 - for await (const chunk of request.body!) { 90 - size += chunk.length; 102 + const { promise: result, resolve, reject } = Promise.withResolvers<{ digest: Uint8Array; size: number }>(); 103 + 104 + let reader: ReadableStreamDefaultReader<Uint8Array>; 105 + 106 + const stream = new ReadableStream<Uint8Array>({ 107 + start() { 108 + reader = input.getReader() as any; 109 + }, 110 + async pull(controller) { 111 + try { 112 + const { done, value } = await reader.read(); 113 + 114 + if (done) { 115 + resolve({ digest: new Uint8Array(hasher.digest()), size }); 116 + controller.close(); 117 + return; 118 + } 119 + 120 + size += value.length; 91 121 92 - if (size > maxSize) { 93 - throw new InvalidRequestError({ 94 - error: 'BlobTooLarge', 95 - description: `blob exceeds upload size limit`, 96 - }); 97 - } 122 + if (size > maxSize) { 123 + const err = new InvalidRequestError({ 124 + error: 'BlobTooLarge', 125 + description: `blob exceeds upload size limit`, 126 + }); 127 + reject(err); 128 + controller.error(new Error('blob too large')); 129 + reader.cancel(); 130 + return; 131 + } 98 132 99 - hasher.update(chunk); 100 - } 133 + hasher.update(value); 134 + controller.enqueue(value); 135 + } catch (err) { 136 + reject(err); 137 + controller.error(err); 138 + } 139 + }, 140 + cancel() { 141 + reader.cancel(); 142 + }, 143 + }); 101 144 102 - return { digest: new Uint8Array(hasher.digest()), size }; 145 + return { stream, result }; 103 146 };
+1 -1
packages/danaus/src/api/com.atproto/sync.getBlob.ts
··· 31 31 32 32 const blob = result.blob; 33 33 34 - return new Response(blob.size > 0 ? blob : null, { 34 + return new Response(blob.size > 0 ? blob.stream() : null, { 35 35 status: 200, 36 36 headers: { 37 37 'content-type': result.metadata.mimeType,