MIRROR: javascript for ๐Ÿœ's, a tiny runtime with big ambitions
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

add streams-pipe spec

+353
+353
examples/spec/streams-pipe.js
··· 1 + import { test, testDeep, testThrows, summary } from './helpers.js'; 2 + 3 + console.log('ReadableStream pipe operations (tee, pipeTo, pipeThrough) Tests\n'); 4 + 5 + async function testTeeBasic() { 6 + const rs = new ReadableStream({ 7 + start(c) { 8 + c.enqueue('a'); 9 + c.enqueue('b'); 10 + c.close(); 11 + } 12 + }); 13 + const [b1, b2] = rs.tee(); 14 + test('tee returns array', Array.isArray([b1, b2]), true); 15 + test('tee branch1 is ReadableStream', b1 instanceof ReadableStream, true); 16 + test('tee branch2 is ReadableStream', b2 instanceof ReadableStream, true); 17 + test('tee locks original', rs.locked, true); 18 + 19 + const r1 = b1.getReader(); 20 + const r2 = b2.getReader(); 21 + 22 + const v1a = await r1.read(); 23 + test('tee b1 read1 value', v1a.value, 'a'); 24 + test('tee b1 read1 done', v1a.done, false); 25 + const v1b = await r1.read(); 26 + test('tee b1 read2 value', v1b.value, 'b'); 27 + const v1c = await r1.read(); 28 + test('tee b1 done', v1c.done, true); 29 + 30 + const v2a = await r2.read(); 31 + test('tee b2 read1 value', v2a.value, 'a'); 32 + const v2b = await r2.read(); 33 + test('tee b2 read2 value', v2b.value, 'b'); 34 + const v2c = await r2.read(); 35 + test('tee b2 done', v2c.done, true); 36 + } 37 + 38 + async function testTeePullBased() { 39 + let pullCount = 0; 40 + const rs = new ReadableStream({ 41 + pull(c) { 42 + pullCount++; 43 + if (pullCount <= 3) c.enqueue(pullCount); 44 + else c.close(); 45 + } 46 + }); 47 + const [b1, b2] = rs.tee(); 48 + const r1 = b1.getReader(); 49 + const r2 = b2.getReader(); 50 + 51 + const chunks1 = []; 52 + const chunks2 = []; 53 + while (true) { 54 + const { value, done } = await r1.read(); 55 + if (done) break; 56 + chunks1.push(value); 57 + } 58 + while (true) { 59 + const { value, done } = await r2.read(); 60 + if (done) break; 61 + chunks2.push(value); 62 + } 63 + testDeep('tee pull b1 chunks', chunks1, [1, 2, 3]); 64 + testDeep('tee pull b2 chunks', chunks2, [1, 2, 3]); 65 + } 66 + 67 + async function testTeeCancelOne() { 68 + let cancelReason = null; 69 + const rs = new ReadableStream({ 70 + pull(c) { 71 + c.enqueue('x'); 72 + }, 73 + cancel(r) { 74 + cancelReason = r; 75 + } 76 + }); 77 + const [b1, b2] = rs.tee(); 78 + const cancelPromise = b1.cancel('reason1'); 79 + test('tee cancel one does not cancel original', cancelReason, null); 80 + 81 + const r2 = b2.getReader(); 82 + const v = await r2.read(); 83 + test('tee other branch still works', v.value, 'x'); 84 + 85 + // The first branch's cancel does not settle until the other branch finishes. 86 + await r2.cancel('reason2'); 87 + await cancelPromise; 88 + } 89 + 90 + async function testTeeCancelBoth() { 91 + let cancelReason = null; 92 + const rs = new ReadableStream({ 93 + pull(c) { 94 + c.enqueue('x'); 95 + }, 96 + cancel(r) { 97 + cancelReason = r; 98 + } 99 + }); 100 + const [b1, b2] = rs.tee(); 101 + const p1 = b1.cancel('r1'); 102 + const p2 = b2.cancel('r2'); 103 + await Promise.all([p1, p2]); 104 + test('tee cancel both cancels original', Array.isArray(cancelReason), true); 105 + testDeep('tee composite reason', cancelReason, ['r1', 'r2']); 106 + } 107 + 108 + testThrows('tee locked stream throws', () => { 109 + const rs = new ReadableStream(); 110 + rs.getReader(); 111 + rs.tee(); 112 + }); 113 + 114 + async function testPipeToBasic() { 115 + const chunks = []; 116 + const rs = new ReadableStream({ 117 + start(c) { 118 + c.enqueue(1); 119 + c.enqueue(2); 120 + c.enqueue(3); 121 + c.close(); 122 + } 123 + }); 124 + const ws = new WritableStream({ 125 + write(chunk) { 126 + chunks.push(chunk); 127 + } 128 + }); 129 + await rs.pipeTo(ws); 130 + testDeep('pipeTo basic chunks', chunks, [1, 2, 3]); 131 + } 132 + 133 + async function testPipeToClosesDestination() { 134 + let closed = false; 135 + const rs = new ReadableStream({ 136 + start(c) { 137 + c.enqueue('a'); 138 + c.close(); 139 + } 140 + }); 141 + const ws = new WritableStream({ 142 + write() {}, 143 + close() { 144 + closed = true; 145 + } 146 + }); 147 + await rs.pipeTo(ws); 148 + test('pipeTo closes dest', closed, true); 149 + } 150 + 151 + async function testPipeToPreventClose() { 152 + let closed = false; 153 + const rs = new ReadableStream({ 154 + start(c) { 155 + c.close(); 156 + } 157 + }); 158 + const ws = new WritableStream({ 159 + close() { 160 + closed = true; 161 + } 162 + }); 163 + await rs.pipeTo(ws, { preventClose: true }); 164 + test('pipeTo preventClose', closed, false); 165 + } 166 + 167 + async function testPipeToErrorPropagation() { 168 + const err = new Error('source error'); 169 + const rs = new ReadableStream({ 170 + start(c) { 171 + c.error(err); 172 + } 173 + }); 174 + let abortReason = null; 175 + const ws = new WritableStream({ 176 + write() {}, 177 + abort(r) { 178 + abortReason = r; 179 + } 180 + }); 181 + try { 182 + await rs.pipeTo(ws); 183 + test('pipeTo should reject on source error', false, true); 184 + } catch (e) { 185 + test('pipeTo rejects with source error', e, err); 186 + } 187 + } 188 + 189 + async function testPipeToPreventAbort() { 190 + const err = new Error('source error'); 191 + const rs = new ReadableStream({ 192 + start(c) { 193 + c.error(err); 194 + } 195 + }); 196 + let aborted = false; 197 + const ws = new WritableStream({ 198 + write() {}, 199 + abort() { 200 + aborted = true; 201 + } 202 + }); 203 + try { 204 + await rs.pipeTo(ws, { preventAbort: true }); 205 + } catch (e) {} 206 + test('pipeTo preventAbort', aborted, false); 207 + } 208 + 209 + async function testPipeToLocksStreams() { 210 + const rs = new ReadableStream({ 211 + start(c) { 212 + c.close(); 213 + } 214 + }); 215 + const ws = new WritableStream(); 216 + const p = rs.pipeTo(ws); 217 + test('pipeTo locks source', rs.locked, true); 218 + test('pipeTo locks dest', ws.locked, true); 219 + await p; 220 + test('pipeTo unlocks source', rs.locked, false); 221 + test('pipeTo unlocks dest', ws.locked, false); 222 + } 223 + 224 + async function testPipeToWithSignal() { 225 + const ac = new AbortController(); 226 + const rs = new ReadableStream({ 227 + start(c) { 228 + c.enqueue('a'); 229 + c.enqueue('b'); 230 + c.close(); 231 + } 232 + }); 233 + const chunks = []; 234 + const ws = new WritableStream({ 235 + write(chunk) { 236 + chunks.push(chunk); 237 + ac.abort(); 238 + } 239 + }); 240 + 241 + try { 242 + await rs.pipeTo(ws, { signal: ac.signal }); 243 + test('pipeTo with signal should reject', false, true); 244 + } catch (e) { 245 + test('pipeTo aborted by signal', true, true); 246 + } 247 + testDeep('pipeTo signal stops further writes', chunks, ['a']); 248 + } 249 + 250 + async function testPipeToAlreadyAbortedSignal() { 251 + const ac = new AbortController(); 252 + ac.abort('pre-aborted'); 253 + const rs = new ReadableStream({ 254 + start(c) { 255 + c.enqueue('x'); 256 + c.close(); 257 + } 258 + }); 259 + const ws = new WritableStream(); 260 + try { 261 + await rs.pipeTo(ws, { signal: ac.signal }); 262 + test('pipeTo pre-aborted should reject', false, true); 263 + } catch (e) { 264 + test('pipeTo pre-aborted signal rejects', true, true); 265 + } 266 + } 267 + 268 + async function testPipeToRejectsLocked() { 269 + const rs = new ReadableStream(); 270 + rs.getReader(); 271 + const ws = new WritableStream(); 272 + try { 273 + await rs.pipeTo(ws); 274 + test('pipeTo locked source should reject', false, true); 275 + } catch (e) { 276 + test('pipeTo locked source rejects', true, true); 277 + } 278 + } 279 + 280 + async function testPipeThroughBasic() { 281 + const rs = new ReadableStream({ 282 + start(c) { 283 + c.enqueue(1); 284 + c.enqueue(2); 285 + c.enqueue(3); 286 + c.close(); 287 + } 288 + }); 289 + 290 + let transformController; 291 + const transform = { 292 + writable: new WritableStream({ 293 + write(chunk) { 294 + transformController.enqueue(chunk * 10); 295 + }, 296 + close() { 297 + transformController.close(); 298 + } 299 + }), 300 + readable: new ReadableStream({ 301 + start(c) { 302 + transformController = c; 303 + } 304 + }) 305 + }; 306 + 307 + const result = rs.pipeThrough(transform); 308 + test('pipeThrough returns readable', result instanceof ReadableStream, true); 309 + 310 + const reader = result.getReader(); 311 + const chunks = []; 312 + while (true) { 313 + const { value, done } = await reader.read(); 314 + if (done) break; 315 + chunks.push(value); 316 + } 317 + testDeep('pipeThrough transforms data', chunks, [10, 20, 30]); 318 + } 319 + 320 + testThrows('pipeThrough locked source throws', () => { 321 + const rs = new ReadableStream(); 322 + rs.getReader(); 323 + rs.pipeThrough({ writable: new WritableStream(), readable: new ReadableStream() }); 324 + }); 325 + 326 + testThrows('pipeThrough missing transform throws', () => { 327 + new ReadableStream().pipeThrough(); 328 + }); 329 + 330 + testThrows('pipeThrough locked writable throws', () => { 331 + const ws = new WritableStream(); 332 + ws.getWriter(); 333 + new ReadableStream().pipeThrough({ writable: ws, readable: new ReadableStream() }); 334 + }); 335 + 336 + await testTeeBasic(); 337 + await testTeePullBased(); 338 + await testTeeCancelOne(); 339 + await testTeeCancelBoth(); 340 + 341 + await testPipeToBasic(); 342 + await testPipeToClosesDestination(); 343 + await testPipeToPreventClose(); 344 + await testPipeToErrorPropagation(); 345 + await testPipeToPreventAbort(); 346 + await testPipeToLocksStreams(); 347 + await testPipeToWithSignal(); 348 + await testPipeToAlreadyAbortedSignal(); 349 + await testPipeToRejectsLocked(); 350 + 351 + await testPipeThroughBasic(); 352 + 353 + summary();