MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 356 lines 8.6 kB view raw
1import { test, testDeep, testThrows, summary } from './helpers.js'; 2 3console.log('ReadableStream pipe operations (tee, pipeTo, pipeThrough) Tests\n'); 4 5async function testTeeBasic() { 6 const rs = new ReadableStream({ 7 start(c) { 8 c.enqueue('a'); 9 c.enqueue('b'); 10 c.close(); 11 } 12 }); 13 const [b1, b2] = rs.tee(); 14 test('tee returns array', Array.isArray([b1, b2]), true); 15 test('tee branch1 is ReadableStream', b1 instanceof ReadableStream, true); 16 test('tee branch2 is ReadableStream', b2 instanceof ReadableStream, true); 17 test('tee locks original', rs.locked, true); 18 19 const r1 = b1.getReader(); 20 const r2 = b2.getReader(); 21 22 const v1a = await r1.read(); 23 test('tee b1 read1 value', v1a.value, 'a'); 24 test('tee b1 read1 done', v1a.done, false); 25 const v1b = await r1.read(); 26 test('tee b1 read2 value', v1b.value, 'b'); 27 const v1c = await r1.read(); 28 test('tee b1 done', v1c.done, true); 29 30 const v2a = await r2.read(); 31 test('tee b2 read1 value', v2a.value, 'a'); 32 const v2b = await r2.read(); 33 test('tee b2 read2 value', v2b.value, 'b'); 34 const v2c = await r2.read(); 35 test('tee b2 done', v2c.done, true); 36} 37 38async function testTeePullBased() { 39 let pullCount = 0; 40 const rs = new ReadableStream({ 41 pull(c) { 42 pullCount++; 43 if (pullCount <= 3) c.enqueue(pullCount); 44 else c.close(); 45 } 46 }); 47 const [b1, b2] = rs.tee(); 48 const r1 = b1.getReader(); 49 const r2 = b2.getReader(); 50 51 const chunks1 = []; 52 const chunks2 = []; 53 while (true) { 54 const { value, done } = await r1.read(); 55 if (done) break; 56 chunks1.push(value); 57 } 58 while (true) { 59 const { value, done } = await r2.read(); 60 if (done) break; 61 chunks2.push(value); 62 } 63 testDeep('tee pull b1 chunks', chunks1, [1, 2, 3]); 64 testDeep('tee pull b2 chunks', chunks2, [1, 2, 3]); 65} 66 67async function testTeeCancelOne() { 68 let cancelReason = null; 69 const rs = new ReadableStream({ 70 pull(c) { 71 c.enqueue('x'); 72 }, 73 cancel(r) { 74 cancelReason = r; 75 } 76 }); 77 const [b1, b2] = rs.tee(); 78 const cancelPromise = b1.cancel('reason1'); 79 test('tee cancel one does not cancel original', cancelReason, null); 80 81 const r2 = b2.getReader(); 82 const v = await r2.read(); 83 test('tee other branch still works', v.value, 'x'); 84 85 // The first branch's cancel does not settle until the other branch finishes. 86 await r2.cancel('reason2'); 87 await cancelPromise; 88} 89 90async function testTeeCancelBoth() { 91 let cancelReason = null; 92 const rs = new ReadableStream({ 93 pull(c) { 94 c.enqueue('x'); 95 }, 96 cancel(r) { 97 cancelReason = r; 98 } 99 }); 100 const [b1, b2] = rs.tee(); 101 const p1 = b1.cancel('r1'); 102 const p2 = b2.cancel('r2'); 103 await Promise.all([p1, p2]); 104 test('tee cancel both cancels original', Array.isArray(cancelReason), true); 105 testDeep('tee composite reason', cancelReason, ['r1', 'r2']); 106} 107 108testThrows('tee locked stream throws', () => { 109 const rs = new ReadableStream(); 110 rs.getReader(); 111 rs.tee(); 112}); 113 114async function testPipeToBasic() { 115 const chunks = []; 116 const rs = new ReadableStream({ 117 start(c) { 118 c.enqueue(1); 119 c.enqueue(2); 120 c.enqueue(3); 121 c.close(); 122 } 123 }); 124 const ws = new WritableStream({ 125 write(chunk) { 126 chunks.push(chunk); 127 } 128 }); 129 await rs.pipeTo(ws); 130 testDeep('pipeTo basic chunks', chunks, [1, 2, 3]); 131} 132 133async function testPipeToClosesDestination() { 134 let closed = false; 135 const rs = new ReadableStream({ 136 start(c) { 137 c.enqueue('a'); 138 c.close(); 139 } 140 }); 141 const ws = new WritableStream({ 142 write() {}, 143 close() { 144 closed = true; 145 } 146 }); 147 await rs.pipeTo(ws); 148 test('pipeTo closes dest', closed, true); 149} 150 151async function testPipeToPreventClose() { 152 let closed = false; 153 const rs = new ReadableStream({ 154 start(c) { 155 c.close(); 156 } 157 }); 158 const ws = new WritableStream({ 159 close() { 160 closed = true; 161 } 162 }); 163 await rs.pipeTo(ws, { preventClose: true }); 164 test('pipeTo preventClose', closed, false); 165} 166 167async function testPipeToErrorPropagation() { 168 const err = new Error('source error'); 169 const rs = new ReadableStream({ 170 start(c) { 171 c.error(err); 172 } 173 }); 174 let abortReason = null; 175 const ws = new WritableStream({ 176 write() {}, 177 abort(r) { 178 abortReason = r; 179 } 180 }); 181 try { 182 await rs.pipeTo(ws); 183 test('pipeTo should reject on source error', false, true); 184 } catch (e) { 185 test('pipeTo rejects with source error', e, err); 186 } 187} 188 189async function testPipeToPreventAbort() { 190 const err = new Error('source error'); 191 const rs = new ReadableStream({ 192 start(c) { 193 c.error(err); 194 } 195 }); 196 let aborted = false; 197 const ws = new WritableStream({ 198 write() {}, 199 abort() { 200 aborted = true; 201 } 202 }); 203 try { 204 await rs.pipeTo(ws, { preventAbort: true }); 205 } catch (e) {} 206 test('pipeTo preventAbort', aborted, false); 207} 208 209async function testPipeToLocksStreams() { 210 const rs = new ReadableStream({ 211 start(c) { 212 c.close(); 213 } 214 }); 215 const ws = new WritableStream(); 216 const p = rs.pipeTo(ws); 217 test('pipeTo locks source', rs.locked, true); 218 test('pipeTo locks dest', ws.locked, true); 219 await p; 220 test('pipeTo unlocks source', rs.locked, false); 221 test('pipeTo unlocks dest', ws.locked, false); 222} 223 224async function testPipeToWithSignal() { 225 const ac = new AbortController(); 226 const rs = new ReadableStream({ 227 start(c) { 228 c.enqueue('a'); 229 c.enqueue('b'); 230 c.close(); 231 } 232 }); 233 const chunks = []; 234 const ws = new WritableStream({ 235 write(chunk) { 236 chunks.push(chunk); 237 ac.abort(); 238 } 239 }); 240 241 try { 242 await rs.pipeTo(ws, { signal: ac.signal }); 243 test('pipeTo with signal should reject', false, true); 244 } catch (e) { 245 test('pipeTo aborted by signal', true, true); 246 test('pipeTo signal rejects with abort semantics', 247 !(e instanceof TypeError) && !(e && /AbortSignal/.test(String(e && e.message))), 248 true); 249 } 250 testDeep('pipeTo signal stops further writes', chunks, ['a']); 251} 252 253async function testPipeToAlreadyAbortedSignal() { 254 const ac = new AbortController(); 255 ac.abort('pre-aborted'); 256 const rs = new ReadableStream({ 257 start(c) { 258 c.enqueue('x'); 259 c.close(); 260 } 261 }); 262 const ws = new WritableStream(); 263 try { 264 await rs.pipeTo(ws, { signal: ac.signal }); 265 test('pipeTo pre-aborted should reject', false, true); 266 } catch (e) { 267 test('pipeTo pre-aborted signal rejects', true, true); 268 } 269} 270 271async function testPipeToRejectsLocked() { 272 const rs = new ReadableStream(); 273 rs.getReader(); 274 const ws = new WritableStream(); 275 try { 276 await rs.pipeTo(ws); 277 test('pipeTo locked source should reject', false, true); 278 } catch (e) { 279 test('pipeTo locked source rejects', true, true); 280 } 281} 282 283async function testPipeThroughBasic() { 284 const rs = new ReadableStream({ 285 start(c) { 286 c.enqueue(1); 287 c.enqueue(2); 288 c.enqueue(3); 289 c.close(); 290 } 291 }); 292 293 let transformController; 294 const transform = { 295 writable: new WritableStream({ 296 write(chunk) { 297 transformController.enqueue(chunk * 10); 298 }, 299 close() { 300 transformController.close(); 301 } 302 }), 303 readable: new ReadableStream({ 304 start(c) { 305 transformController = c; 306 } 307 }) 308 }; 309 310 const result = rs.pipeThrough(transform); 311 test('pipeThrough returns readable', result instanceof ReadableStream, true); 312 313 const reader = result.getReader(); 314 const chunks = []; 315 while (true) { 316 const { value, done } = await reader.read(); 317 if (done) break; 318 chunks.push(value); 319 } 320 testDeep('pipeThrough transforms data', chunks, [10, 20, 30]); 321} 322 323testThrows('pipeThrough locked source throws', () => { 324 const rs = new ReadableStream(); 325 rs.getReader(); 326 rs.pipeThrough({ writable: new WritableStream(), readable: new ReadableStream() }); 327}); 328 329testThrows('pipeThrough missing transform throws', () => { 330 new ReadableStream().pipeThrough(); 331}); 332 333testThrows('pipeThrough locked writable throws', () => { 334 const ws = new WritableStream(); 335 ws.getWriter(); 336 new ReadableStream().pipeThrough({ writable: ws, readable: new ReadableStream() }); 337}); 338 339await testTeeBasic(); 340await testTeePullBased(); 341await testTeeCancelOne(); 342await testTeeCancelBoth(); 343 344await testPipeToBasic(); 345await testPipeToClosesDestination(); 346await testPipeToPreventClose(); 347await testPipeToErrorPropagation(); 348await testPipeToPreventAbort(); 349await testPipeToLocksStreams(); 350await testPipeToWithSignal(); 351await testPipeToAlreadyAbortedSignal(); 352await testPipeToRejectsLocked(); 353 354await testPipeThroughBasic(); 355 356summary();