Goal#
Implement the WHATWG Streams API foundation: ReadableStream, WritableStream, and TransformStream as JS-exposed classes in we-js, with their default controllers, default readers and writers, and the queuing strategies and state machines required by later issues.
Scope#
ReadableStream- Constructor:
new ReadableStream(underlyingSource?, strategy?) underlyingSourcecallbacks:start,pull,cancel,type(undefined→ default,'bytes'deferred to the next issue)ReadableStreamDefaultController:enqueue(chunk),close(),error(e),desiredSizeReadableStreamDefaultReader:getReader(),read()→Promise<{ value, done }>,releaseLock(),closed,cancel(reason)- State:
readable/closed/errored lockedgetter- Async iterator (
@@asyncIterator) onReadableStreamsofor await (const chunk of stream)works
- Constructor:
WritableStream- Constructor:
new WritableStream(underlyingSink?, strategy?) underlyingSinkcallbacks:start,write,close,abort,type(reserved)WritableStreamDefaultController:error(e),signalWritableStreamDefaultWriter:getWriter(),write(chunk),close(),abort(reason),releaseLock(),desiredSize,ready,closed- State:
writable/closing/closed/erroring/errored
- Constructor:
TransformStream- Constructor:
new TransformStream(transformer?, writableStrategy?, readableStrategy?) transformercallbacks:start,transform,flushTransformStreamDefaultController:enqueue,terminate,error,desiredSize- Exposes a paired
readableandwritable
- Constructor:
- Queuing strategies
CountQueuingStrategy({ highWaterMark })with.size = () => 1ByteLengthQueuingStrategy({ highWaterMark })with.size = (chunk) => chunk.byteLength
- Internal queue with
desiredSize = highWaterMark - currentSizesemantics - Microtask scheduling: pulls and writes run via the existing microtask queue from Phase 10
Acceptance Criteria#
new ReadableStream(...).getReader().read()returns a thenable that resolves to{ value, done }matching the WHATWG semantics- Backpressure:
controller.desiredSizedecreases as chunks are enqueued, andpull()is invoked whendesiredSize > 0and the queue is below the high water mark - Errored streams reject pending reads/writes and subsequent
read()/write()calls writable.getWriter().write(chunk)returns a promise that resolves when the underlying sink accepts the chunk (respecting backpressure viaready)TransformStreampaired streams chain backpressure correctly (writable ready depends on readable buffer level)- Async iteration over a
ReadableStreamyields enqueued chunks and terminates on close - Unit tests covering: default reader read/close/error/cancel, default writer write/close/abort, queueing strategies, transform stream identity, error propagation, locking, async iteration
- Conventions
cargo fmt --all --checkcargo clippy --workspace -- -D warningscargo test --workspace- No external crate dependencies;
unsafeonly insidejsif strictly needed for GC integration