MIRROR: javascript for 馃悳's, a tiny runtime with big ambitions
1import { test, testDeep, testThrows, summary } from './helpers.js';
2
3console.log('ReadableStream pipe operations (tee, pipeTo, pipeThrough) Tests\n');
4
5async function testTeeBasic() {
6 const rs = new ReadableStream({
7 start(c) {
8 c.enqueue('a');
9 c.enqueue('b');
10 c.close();
11 }
12 });
13 const [b1, b2] = rs.tee();
14 test('tee returns array', Array.isArray([b1, b2]), true);
15 test('tee branch1 is ReadableStream', b1 instanceof ReadableStream, true);
16 test('tee branch2 is ReadableStream', b2 instanceof ReadableStream, true);
17 test('tee locks original', rs.locked, true);
18
19 const r1 = b1.getReader();
20 const r2 = b2.getReader();
21
22 const v1a = await r1.read();
23 test('tee b1 read1 value', v1a.value, 'a');
24 test('tee b1 read1 done', v1a.done, false);
25 const v1b = await r1.read();
26 test('tee b1 read2 value', v1b.value, 'b');
27 const v1c = await r1.read();
28 test('tee b1 done', v1c.done, true);
29
30 const v2a = await r2.read();
31 test('tee b2 read1 value', v2a.value, 'a');
32 const v2b = await r2.read();
33 test('tee b2 read2 value', v2b.value, 'b');
34 const v2c = await r2.read();
35 test('tee b2 done', v2c.done, true);
36}
37
38async function testTeePullBased() {
39 let pullCount = 0;
40 const rs = new ReadableStream({
41 pull(c) {
42 pullCount++;
43 if (pullCount <= 3) c.enqueue(pullCount);
44 else c.close();
45 }
46 });
47 const [b1, b2] = rs.tee();
48 const r1 = b1.getReader();
49 const r2 = b2.getReader();
50
51 const chunks1 = [];
52 const chunks2 = [];
53 while (true) {
54 const { value, done } = await r1.read();
55 if (done) break;
56 chunks1.push(value);
57 }
58 while (true) {
59 const { value, done } = await r2.read();
60 if (done) break;
61 chunks2.push(value);
62 }
63 testDeep('tee pull b1 chunks', chunks1, [1, 2, 3]);
64 testDeep('tee pull b2 chunks', chunks2, [1, 2, 3]);
65}
66
67async function testTeeCancelOne() {
68 let cancelReason = null;
69 const rs = new ReadableStream({
70 pull(c) {
71 c.enqueue('x');
72 },
73 cancel(r) {
74 cancelReason = r;
75 }
76 });
77 const [b1, b2] = rs.tee();
78 const cancelPromise = b1.cancel('reason1');
79 test('tee cancel one does not cancel original', cancelReason, null);
80
81 const r2 = b2.getReader();
82 const v = await r2.read();
83 test('tee other branch still works', v.value, 'x');
84
85 // The first branch's cancel does not settle until the other branch finishes.
86 await r2.cancel('reason2');
87 await cancelPromise;
88}
89
90async function testTeeCancelBoth() {
91 let cancelReason = null;
92 const rs = new ReadableStream({
93 pull(c) {
94 c.enqueue('x');
95 },
96 cancel(r) {
97 cancelReason = r;
98 }
99 });
100 const [b1, b2] = rs.tee();
101 const p1 = b1.cancel('r1');
102 const p2 = b2.cancel('r2');
103 await Promise.all([p1, p2]);
104 test('tee cancel both cancels original', Array.isArray(cancelReason), true);
105 testDeep('tee composite reason', cancelReason, ['r1', 'r2']);
106}
107
108testThrows('tee locked stream throws', () => {
109 const rs = new ReadableStream();
110 rs.getReader();
111 rs.tee();
112});
113
114async function testPipeToBasic() {
115 const chunks = [];
116 const rs = new ReadableStream({
117 start(c) {
118 c.enqueue(1);
119 c.enqueue(2);
120 c.enqueue(3);
121 c.close();
122 }
123 });
124 const ws = new WritableStream({
125 write(chunk) {
126 chunks.push(chunk);
127 }
128 });
129 await rs.pipeTo(ws);
130 testDeep('pipeTo basic chunks', chunks, [1, 2, 3]);
131}
132
133async function testPipeToClosesDestination() {
134 let closed = false;
135 const rs = new ReadableStream({
136 start(c) {
137 c.enqueue('a');
138 c.close();
139 }
140 });
141 const ws = new WritableStream({
142 write() {},
143 close() {
144 closed = true;
145 }
146 });
147 await rs.pipeTo(ws);
148 test('pipeTo closes dest', closed, true);
149}
150
151async function testPipeToPreventClose() {
152 let closed = false;
153 const rs = new ReadableStream({
154 start(c) {
155 c.close();
156 }
157 });
158 const ws = new WritableStream({
159 close() {
160 closed = true;
161 }
162 });
163 await rs.pipeTo(ws, { preventClose: true });
164 test('pipeTo preventClose', closed, false);
165}
166
167async function testPipeToErrorPropagation() {
168 const err = new Error('source error');
169 const rs = new ReadableStream({
170 start(c) {
171 c.error(err);
172 }
173 });
174 let abortReason = null;
175 const ws = new WritableStream({
176 write() {},
177 abort(r) {
178 abortReason = r;
179 }
180 });
181 try {
182 await rs.pipeTo(ws);
183 test('pipeTo should reject on source error', false, true);
184 } catch (e) {
185 test('pipeTo rejects with source error', e, err);
186 }
187}
188
189async function testPipeToPreventAbort() {
190 const err = new Error('source error');
191 const rs = new ReadableStream({
192 start(c) {
193 c.error(err);
194 }
195 });
196 let aborted = false;
197 const ws = new WritableStream({
198 write() {},
199 abort() {
200 aborted = true;
201 }
202 });
203 try {
204 await rs.pipeTo(ws, { preventAbort: true });
205 } catch (e) {}
206 test('pipeTo preventAbort', aborted, false);
207}
208
209async function testPipeToLocksStreams() {
210 const rs = new ReadableStream({
211 start(c) {
212 c.close();
213 }
214 });
215 const ws = new WritableStream();
216 const p = rs.pipeTo(ws);
217 test('pipeTo locks source', rs.locked, true);
218 test('pipeTo locks dest', ws.locked, true);
219 await p;
220 test('pipeTo unlocks source', rs.locked, false);
221 test('pipeTo unlocks dest', ws.locked, false);
222}
223
224async function testPipeToWithSignal() {
225 const ac = new AbortController();
226 const rs = new ReadableStream({
227 start(c) {
228 c.enqueue('a');
229 c.enqueue('b');
230 c.close();
231 }
232 });
233 const chunks = [];
234 const ws = new WritableStream({
235 write(chunk) {
236 chunks.push(chunk);
237 ac.abort();
238 }
239 });
240
241 try {
242 await rs.pipeTo(ws, { signal: ac.signal });
243 test('pipeTo with signal should reject', false, true);
244 } catch (e) {
245 test('pipeTo aborted by signal', true, true);
246 test('pipeTo signal rejects with abort semantics',
247 !(e instanceof TypeError) && !(e && /AbortSignal/.test(String(e && e.message))),
248 true);
249 }
250 testDeep('pipeTo signal stops further writes', chunks, ['a']);
251}
252
253async function testPipeToAlreadyAbortedSignal() {
254 const ac = new AbortController();
255 ac.abort('pre-aborted');
256 const rs = new ReadableStream({
257 start(c) {
258 c.enqueue('x');
259 c.close();
260 }
261 });
262 const ws = new WritableStream();
263 try {
264 await rs.pipeTo(ws, { signal: ac.signal });
265 test('pipeTo pre-aborted should reject', false, true);
266 } catch (e) {
267 test('pipeTo pre-aborted signal rejects', true, true);
268 }
269}
270
271async function testPipeToRejectsLocked() {
272 const rs = new ReadableStream();
273 rs.getReader();
274 const ws = new WritableStream();
275 try {
276 await rs.pipeTo(ws);
277 test('pipeTo locked source should reject', false, true);
278 } catch (e) {
279 test('pipeTo locked source rejects', true, true);
280 }
281}
282
283async function testPipeThroughBasic() {
284 const rs = new ReadableStream({
285 start(c) {
286 c.enqueue(1);
287 c.enqueue(2);
288 c.enqueue(3);
289 c.close();
290 }
291 });
292
293 let transformController;
294 const transform = {
295 writable: new WritableStream({
296 write(chunk) {
297 transformController.enqueue(chunk * 10);
298 },
299 close() {
300 transformController.close();
301 }
302 }),
303 readable: new ReadableStream({
304 start(c) {
305 transformController = c;
306 }
307 })
308 };
309
310 const result = rs.pipeThrough(transform);
311 test('pipeThrough returns readable', result instanceof ReadableStream, true);
312
313 const reader = result.getReader();
314 const chunks = [];
315 while (true) {
316 const { value, done } = await reader.read();
317 if (done) break;
318 chunks.push(value);
319 }
320 testDeep('pipeThrough transforms data', chunks, [10, 20, 30]);
321}
322
323testThrows('pipeThrough locked source throws', () => {
324 const rs = new ReadableStream();
325 rs.getReader();
326 rs.pipeThrough({ writable: new WritableStream(), readable: new ReadableStream() });
327});
328
329testThrows('pipeThrough missing transform throws', () => {
330 new ReadableStream().pipeThrough();
331});
332
333testThrows('pipeThrough locked writable throws', () => {
334 const ws = new WritableStream();
335 ws.getWriter();
336 new ReadableStream().pipeThrough({ writable: ws, readable: new ReadableStream() });
337});
338
339await testTeeBasic();
340await testTeePullBased();
341await testTeeCancelOne();
342await testTeeCancelBoth();
343
344await testPipeToBasic();
345await testPipeToClosesDestination();
346await testPipeToPreventClose();
347await testPipeToErrorPropagation();
348await testPipeToPreventAbort();
349await testPipeToLocksStreams();
350await testPipeToWithSignal();
351await testPipeToAlreadyAbortedSignal();
352await testPipeToRejectsLocked();
353
354await testPipeThroughBasic();
355
356summary();