MIRROR: javascript for ๐Ÿœ's, a tiny runtime with big ambitions
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

revise fs stream

+83 -20
+1
include/modules/buffer.h
··· 72 72 73 73 size_t buffer_get_external_memory(void); 74 74 bool buffer_is_dataview(ant_value_t obj); 75 + bool buffer_is_binary_source(ant_value_t value); 75 76 bool buffer_source_get_bytes(ant_t *js, ant_value_t value, const uint8_t **out, size_t *len); 76 77 77 78 #endif
+12 -1
src/modules/buffer.c
··· 39 39 return js_check_brand(obj, BRAND_DATAVIEW); 40 40 } 41 41 42 + bool buffer_is_binary_source(ant_value_t value) { 43 + ant_value_t slot = 0; 44 + 45 + if (vtype(value) == T_TYPEDARRAY) return true; 46 + if (!is_object_type(value)) return false; 47 + if (buffer_is_dataview(value)) return true; 48 + 49 + slot = js_get_slot(value, SLOT_BUFFER); 50 + return vtype(slot) == T_TYPEDARRAY || vtype(slot) == T_NUM; 51 + } 52 + 42 53 bool buffer_source_get_bytes(ant_t *js, ant_value_t value, const uint8_t **out, size_t *len) { 43 54 if (out) *out = NULL; 44 55 if (len) *len = 0; 45 - if (vtype(value) != T_TYPEDARRAY && !is_object_type(value)) return false; 56 + if (!buffer_is_binary_source(value)) return false; 46 57 47 58 ant_value_t slot = js_get_slot(value, SLOT_BUFFER); 48 59 TypedArrayData *ta = (TypedArrayData *)js_gettypedarray(slot);
+45 -7
src/modules/stream.c
··· 6 6 #include "internal.h" 7 7 #include "silver/engine.h" 8 8 #include "esm/loader.h" 9 - 10 9 #include "gc/roots.h" 11 10 12 11 #include "modules/assert.h" 12 + #include "modules/buffer.h" 13 13 #include "modules/events.h" 14 14 #include "modules/stream.h" 15 15 #include "modules/symbol.h" ··· 33 33 34 34 static ant_value_t g_passthrough_proto = 0; 35 35 static ant_value_t g_passthrough_ctor = 0; 36 + 37 + static ant_value_t stream_readable_maybe_read(ant_t *js, ant_value_t stream_obj); 38 + static ant_value_t stream_readable_flush(ant_t *js, ant_value_t stream_obj); 39 + static ant_value_t stream_readable_push_value(ant_t *js, ant_value_t stream_obj, ant_value_t chunk, ant_value_t encoding); 40 + static ant_value_t stream_readable_continue_flowing(ant_t *js, ant_value_t *args, int nargs); 36 41 37 42 static ant_value_t stream_noop(ant_t *js, ant_value_t *args, int nargs) { 38 43 return js_mkundef(); ··· 134 139 ) { 135 140 ant_value_t str_val = 0; 136 141 137 - if (object_mode || is_null(chunk) || is_undefined(chunk) || vtype(chunk) == T_TYPEDARRAY) 138 - return chunk; 142 + if ( 143 + object_mode || is_null(chunk) || is_undefined(chunk) || 144 + vtype(chunk) == T_TYPEDARRAY || buffer_is_binary_source(chunk) 145 + ) return chunk; 139 146 140 147 if (vtype(chunk) == T_STR) return stream_make_buffer(js, chunk, encoding); 141 148 142 149 str_val = js_tostring_val(js, chunk); 143 150 if (is_err(str_val)) return str_val; 151 + 144 152 return stream_make_buffer(js, str_val, encoding); 145 153 } 146 154 ··· 285 293 js_set(js, state, "ended", js_false); 286 294 js_set(js, state, "endEmitted", js_false); 287 295 js_set(js, state, "flowing", js_false); 296 + js_set(js, state, "flowingReadScheduled", js_false); 288 297 js_set(js, state, "reading", js_false); 289 298 js_set(js, state, "highWaterMark", js_mknum(high_water_mark)); 290 299 js_set(js, state, "buffer", js_mkarr(js)); ··· 341 350 eventemitter_emit_args(js, stream_obj, "error", args, 1); 342 351 } 343 352 344 - static ant_value_t stream_readable_maybe_read(ant_t *js, ant_value_t stream_obj); 345 - static ant_value_t stream_readable_flush(ant_t *js, ant_value_t stream_obj); 346 - static ant_value_t stream_readable_push_value(ant_t *js, ant_value_t stream_obj, ant_value_t chunk, ant_value_t encoding); 353 + static void stream_readable_schedule_continue_flowing(ant_t *js, ant_value_t stream_obj) { 354 + ant_value_t state = stream_readable_state(js, stream_obj); 355 + 356 + if (!is_object_type(state)) return; 357 + if (!js_truthy(js, js_get(js, state, "flowing"))) return; 358 + if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return; 359 + if (js_truthy(js, js_get(js, state, "ended"))) return; 360 + if (stream_readable_buffer_len(js, stream_obj) > 0) return; 361 + if (js_truthy(js, js_get(js, state, "flowingReadScheduled"))) return; 362 + 363 + js_set(js, state, "flowingReadScheduled", js_true); 364 + stream_schedule_microtask(js, stream_readable_continue_flowing, stream_obj); 365 + } 347 366 348 367 static ant_value_t js_stream_pause(ant_t *js, ant_value_t *args, int nargs) { 349 368 ant_value_t stream_obj = stream_require_this(js, js_getthis(js), "stream"); ··· 598 617 return js_mkundef(); 599 618 } 600 619 620 + static ant_value_t stream_readable_continue_flowing(ant_t *js, ant_value_t *args, int nargs) { 621 + ant_value_t stream_obj = js_get_slot(js_getcurrentfunc(js), SLOT_DATA); 622 + ant_value_t state = stream_readable_state(js, stream_obj); 623 + 624 + if (!is_object_type(state)) return js_mkundef(); 625 + js_set(js, state, "flowingReadScheduled", js_false); 626 + 627 + if (!js_truthy(js, js_get(js, state, "flowing"))) return js_mkundef(); 628 + if (js_truthy(js, js_get(js, stream_obj, "destroyed"))) return js_mkundef(); 629 + if (js_truthy(js, js_get(js, state, "ended"))) return js_mkundef(); 630 + 631 + stream_readable_maybe_read(js, stream_obj); 632 + stream_readable_flush(js, stream_obj); 633 + 634 + return js_mkundef(); 635 + } 636 + 601 637 static ant_value_t stream_readable_flush(ant_t *js, ant_value_t stream_obj) { 602 638 ant_value_t state = stream_readable_state(js, stream_obj); 639 + bool emitted_data = false; 603 640 604 641 if (!is_object_type(state)) return js_mkundef(); 605 642 606 643 while (js_truthy(js, js_get(js, state, "flowing")) && stream_readable_buffer_len(js, stream_obj) > 0) { 607 644 ant_value_t chunk = stream_buffer_shift(js, stream_obj); 645 + emitted_data = true; 608 646 eventemitter_emit_args(js, stream_obj, "data", &chunk, 1); 609 647 } 610 648 ··· 617 655 js_set(js, stream_obj, "readableEnded", js_true); 618 656 stream_emit_named(js, stream_obj, "end"); 619 657 stream_emit_named(js, stream_obj, "close"); 620 - } 658 + } else if (emitted_data) stream_readable_schedule_continue_flowing(js, stream_obj); 621 659 622 660 return js_mkundef(); 623 661 }
+25 -12
tests/test_fs_streams.cjs
··· 1 1 const fs = require('node:fs'); 2 - const sourcePath = 'tests/.fs_stream_source.txt'; 3 - const copyPath = 'tests/.fs_stream_copy.txt'; 4 - const content = 'hello from fs streams\nline two'; 2 + const { Buffer } = require('node:buffer'); 3 + const sourcePath = '/tmp/ant_fs_stream_seq_source.bin'; 4 + const copyPath = '/tmp/ant_fs_stream_seq_copy.bin'; 5 + const content = Buffer.alloc(70000); 6 + 7 + for (let i = 0; i < content.length; i++) { 8 + content[i] = i & 255; 9 + } 10 + 11 + function sameBuffer(left, right) { 12 + if (!left || !right || left.length !== right.length) return false; 13 + for (let i = 0; i < left.length; i++) { 14 + if (left[i] !== right[i]) return false; 15 + } 16 + return true; 17 + } 5 18 6 19 try { 7 20 fs.unlinkSync(sourcePath); ··· 19 32 if (!(writer instanceof fs.WriteStream)) throw new Error('createWriteStream() did not return fs.WriteStream'); 20 33 writer.on('error', fail); 21 34 writer.on('finish', () => { 22 - const written = fs.readFileSync(sourcePath, 'utf8'); 23 - if (written !== content) fail(new Error(`unexpected write content: ${written}`)); 35 + const written = fs.readFileSync(sourcePath); 36 + if (!sameBuffer(written, content)) fail(new Error(`unexpected write content length: ${written.length}`)); 24 37 25 38 const reader = fs.createReadStream(sourcePath); 26 39 if (!(reader instanceof fs.ReadStream)) fail(new Error('createReadStream() did not return fs.ReadStream')); 27 40 28 - let readBack = ''; 41 + const readBack = []; 29 42 reader.on('error', fail); 30 43 reader.on('data', chunk => { 31 - readBack += chunk.toString(); 44 + readBack.push(chunk); 32 45 }); 33 46 reader.on('end', () => { 34 - if (readBack !== content) fail(new Error(`unexpected read content: ${readBack}`)); 47 + if (!sameBuffer(Buffer.concat(readBack), content)) fail(new Error('unexpected read content')); 35 48 36 49 const pipedReader = fs.createReadStream(sourcePath); 37 50 const pipedWriter = fs.createWriteStream(copyPath); 38 51 pipedReader.on('error', fail); 39 52 pipedWriter.on('error', fail); 40 53 pipedWriter.on('finish', () => { 41 - const copied = fs.readFileSync(copyPath, 'utf8'); 42 - if (copied !== content) fail(new Error(`unexpected piped content: ${copied}`)); 54 + const copied = fs.readFileSync(copyPath); 55 + if (!sameBuffer(copied, content)) fail(new Error(`unexpected piped content length: ${copied.length}`)); 43 56 44 57 fs.unlinkSync(sourcePath); 45 58 fs.unlinkSync(copyPath); ··· 49 62 }); 50 63 }); 51 64 52 - writer.write('hello '); 53 - writer.end('from fs streams\nline two'); 65 + writer.write(content.subarray(0, 32768)); 66 + writer.end(content.subarray(32768));