| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290 |
- // Copyright Joyent, Inc. and other Node contributors.
- //
- // Permission is hereby granted, free of charge, to any person obtaining a
- // copy of this software and associated documentation files (the
- // "Software"), to deal in the Software without restriction, including
- // without limitation the rights to use, copy, modify, merge, publish,
- // distribute, sublicense, and/or sell copies of the Software, and to permit
- // persons to whom the Software is furnished to do so, subject to the
- // following conditions:
- //
- // The above copyright notice and this permission notice shall be included
- // in all copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
- // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
- // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
- // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
- // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
- // USE OR OTHER DEALINGS IN THE SOFTWARE.
- 'use strict'
- /* replacement start */
- const process = require('process/')
- /* replacement end */
- const {
- ArrayPrototypeIndexOf,
- NumberIsInteger,
- NumberIsNaN,
- NumberParseInt,
- ObjectDefineProperties,
- ObjectKeys,
- ObjectSetPrototypeOf,
- Promise,
- SafeSet,
- SymbolAsyncDispose,
- SymbolAsyncIterator,
- Symbol
- } = require('../../ours/primordials')
- module.exports = Readable
- Readable.ReadableState = ReadableState
- const { EventEmitter: EE } = require('events')
- const { Stream, prependListener } = require('./legacy')
- const { Buffer } = require('buffer')
- const { addAbortSignal } = require('./add-abort-signal')
- const eos = require('./end-of-stream')
- let debug = require('../../ours/util').debuglog('stream', (fn) => {
- debug = fn
- })
- const BufferList = require('./buffer_list')
- const destroyImpl = require('./destroy')
- const { getHighWaterMark, getDefaultHighWaterMark } = require('./state')
- const {
- aggregateTwoErrors,
- codes: {
- ERR_INVALID_ARG_TYPE,
- ERR_METHOD_NOT_IMPLEMENTED,
- ERR_OUT_OF_RANGE,
- ERR_STREAM_PUSH_AFTER_EOF,
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT
- },
- AbortError
- } = require('../../ours/errors')
- const { validateObject } = require('../validators')
- const kPaused = Symbol('kPaused')
- const { StringDecoder } = require('string_decoder/')
- const from = require('./from')
- ObjectSetPrototypeOf(Readable.prototype, Stream.prototype)
- ObjectSetPrototypeOf(Readable, Stream)
- const nop = () => {}
- const { errorOrDestroy } = destroyImpl
- const kObjectMode = 1 << 0
- const kEnded = 1 << 1
- const kEndEmitted = 1 << 2
- const kReading = 1 << 3
- const kConstructed = 1 << 4
- const kSync = 1 << 5
- const kNeedReadable = 1 << 6
- const kEmittedReadable = 1 << 7
- const kReadableListening = 1 << 8
- const kResumeScheduled = 1 << 9
- const kErrorEmitted = 1 << 10
- const kEmitClose = 1 << 11
- const kAutoDestroy = 1 << 12
- const kDestroyed = 1 << 13
- const kClosed = 1 << 14
- const kCloseEmitted = 1 << 15
- const kMultiAwaitDrain = 1 << 16
- const kReadingMore = 1 << 17
- const kDataEmitted = 1 << 18
- // TODO(benjamingr) it is likely slower to do it this way than with free functions
- function makeBitMapDescriptor(bit) {
- return {
- enumerable: false,
- get() {
- return (this.state & bit) !== 0
- },
- set(value) {
- if (value) this.state |= bit
- else this.state &= ~bit
- }
- }
- }
- ObjectDefineProperties(ReadableState.prototype, {
- objectMode: makeBitMapDescriptor(kObjectMode),
- ended: makeBitMapDescriptor(kEnded),
- endEmitted: makeBitMapDescriptor(kEndEmitted),
- reading: makeBitMapDescriptor(kReading),
- // Stream is still being constructed and cannot be
- // destroyed until construction finished or failed.
- // Async construction is opt in, therefore we start as
- // constructed.
- constructed: makeBitMapDescriptor(kConstructed),
- // A flag to be able to tell if the event 'readable'/'data' is emitted
- // immediately, or on a later tick. We set this to true at first, because
- // any actions that shouldn't happen until "later" should generally also
- // not happen before the first read call.
- sync: makeBitMapDescriptor(kSync),
- // Whenever we return null, then we set a flag to say
- // that we're awaiting a 'readable' event emission.
- needReadable: makeBitMapDescriptor(kNeedReadable),
- emittedReadable: makeBitMapDescriptor(kEmittedReadable),
- readableListening: makeBitMapDescriptor(kReadableListening),
- resumeScheduled: makeBitMapDescriptor(kResumeScheduled),
- // True if the error was already emitted and should not be thrown again.
- errorEmitted: makeBitMapDescriptor(kErrorEmitted),
- emitClose: makeBitMapDescriptor(kEmitClose),
- autoDestroy: makeBitMapDescriptor(kAutoDestroy),
- // Has it been destroyed.
- destroyed: makeBitMapDescriptor(kDestroyed),
- // Indicates whether the stream has finished destroying.
- closed: makeBitMapDescriptor(kClosed),
- // True if close has been emitted or would have been emitted
- // depending on emitClose.
- closeEmitted: makeBitMapDescriptor(kCloseEmitted),
- multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain),
- // If true, a maybeReadMore has been scheduled.
- readingMore: makeBitMapDescriptor(kReadingMore),
- dataEmitted: makeBitMapDescriptor(kDataEmitted)
- })
- function ReadableState(options, stream, isDuplex) {
- // Duplex streams are both readable and writable, but share
- // the same options object.
- // However, some cases require setting options to different
- // values for the readable and the writable sides of the duplex stream.
- // These options can be provided separately as readableXXX and writableXXX.
- if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex')
- // Bit map field to store ReadableState more effciently with 1 bit per field
- // instead of a V8 slot per field.
- this.state = kEmitClose | kAutoDestroy | kConstructed | kSync
- // Object stream flag. Used to make read(n) ignore n and to
- // make all the buffer merging and length checks go away.
- if (options && options.objectMode) this.state |= kObjectMode
- if (isDuplex && options && options.readableObjectMode) this.state |= kObjectMode
- // The point at which it stops calling _read() to fill the buffer
- // Note: 0 is a valid value, means "don't call _read preemptively ever"
- this.highWaterMark = options
- ? getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex)
- : getDefaultHighWaterMark(false)
- // A linked list is used to store data chunks instead of an array because the
- // linked list can remove elements from the beginning faster than
- // array.shift().
- this.buffer = new BufferList()
- this.length = 0
- this.pipes = []
- this.flowing = null
- this[kPaused] = null
- // Should close be emitted on destroy. Defaults to true.
- if (options && options.emitClose === false) this.state &= ~kEmitClose
- // Should .destroy() be called after 'end' (and potentially 'finish').
- if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy
- // Indicates whether the stream has errored. When true no further
- // _read calls, 'data' or 'readable' events should occur. This is needed
- // since when autoDestroy is disabled we need a way to tell whether the
- // stream has failed.
- this.errored = null
- // Crypto is kind of old and crusty. Historically, its default string
- // encoding is 'binary' so we have to make this configurable.
- // Everything else in the universe uses 'utf8', though.
- this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'
- // Ref the piped dest which we need a drain event on it
- // type: null | Writable | Set<Writable>.
- this.awaitDrainWriters = null
- this.decoder = null
- this.encoding = null
- if (options && options.encoding) {
- this.decoder = new StringDecoder(options.encoding)
- this.encoding = options.encoding
- }
- }
- function Readable(options) {
- if (!(this instanceof Readable)) return new Readable(options)
- // Checking for a Stream.Duplex instance is faster here instead of inside
- // the ReadableState constructor, at least with V8 6.5.
- const isDuplex = this instanceof require('./duplex')
- this._readableState = new ReadableState(options, this, isDuplex)
- if (options) {
- if (typeof options.read === 'function') this._read = options.read
- if (typeof options.destroy === 'function') this._destroy = options.destroy
- if (typeof options.construct === 'function') this._construct = options.construct
- if (options.signal && !isDuplex) addAbortSignal(options.signal, this)
- }
- Stream.call(this, options)
- destroyImpl.construct(this, () => {
- if (this._readableState.needReadable) {
- maybeReadMore(this, this._readableState)
- }
- })
- }
- Readable.prototype.destroy = destroyImpl.destroy
- Readable.prototype._undestroy = destroyImpl.undestroy
- Readable.prototype._destroy = function (err, cb) {
- cb(err)
- }
- Readable.prototype[EE.captureRejectionSymbol] = function (err) {
- this.destroy(err)
- }
- Readable.prototype[SymbolAsyncDispose] = function () {
- let error
- if (!this.destroyed) {
- error = this.readableEnded ? null : new AbortError()
- this.destroy(error)
- }
- return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null))))
- }
- // Manually shove something into the read() buffer.
- // This returns true if the highWaterMark has not been hit yet,
- // similar to how Writable.write() returns true if you should
- // write() some more.
- Readable.prototype.push = function (chunk, encoding) {
- return readableAddChunk(this, chunk, encoding, false)
- }
- // Unshift should *always* be something directly out of read().
- Readable.prototype.unshift = function (chunk, encoding) {
- return readableAddChunk(this, chunk, encoding, true)
- }
- function readableAddChunk(stream, chunk, encoding, addToFront) {
- debug('readableAddChunk', chunk)
- const state = stream._readableState
- let err
- if ((state.state & kObjectMode) === 0) {
- if (typeof chunk === 'string') {
- encoding = encoding || state.defaultEncoding
- if (state.encoding !== encoding) {
- if (addToFront && state.encoding) {
- // When unshifting, if state.encoding is set, we have to save
- // the string in the BufferList with the state encoding.
- chunk = Buffer.from(chunk, encoding).toString(state.encoding)
- } else {
- chunk = Buffer.from(chunk, encoding)
- encoding = ''
- }
- }
- } else if (chunk instanceof Buffer) {
- encoding = ''
- } else if (Stream._isUint8Array(chunk)) {
- chunk = Stream._uint8ArrayToBuffer(chunk)
- encoding = ''
- } else if (chunk != null) {
- err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk)
- }
- }
- if (err) {
- errorOrDestroy(stream, err)
- } else if (chunk === null) {
- state.state &= ~kReading
- onEofChunk(stream, state)
- } else if ((state.state & kObjectMode) !== 0 || (chunk && chunk.length > 0)) {
- if (addToFront) {
- if ((state.state & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT())
- else if (state.destroyed || state.errored) return false
- else addChunk(stream, state, chunk, true)
- } else if (state.ended) {
- errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF())
- } else if (state.destroyed || state.errored) {
- return false
- } else {
- state.state &= ~kReading
- if (state.decoder && !encoding) {
- chunk = state.decoder.write(chunk)
- if (state.objectMode || chunk.length !== 0) addChunk(stream, state, chunk, false)
- else maybeReadMore(stream, state)
- } else {
- addChunk(stream, state, chunk, false)
- }
- }
- } else if (!addToFront) {
- state.state &= ~kReading
- maybeReadMore(stream, state)
- }
- // We can push more data if we are below the highWaterMark.
- // Also, if we have no data yet, we can stand some more bytes.
- // This is to work around cases where hwm=0, such as the repl.
- return !state.ended && (state.length < state.highWaterMark || state.length === 0)
- }
- function addChunk(stream, state, chunk, addToFront) {
- if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) {
- // Use the guard to avoid creating `Set()` repeatedly
- // when we have multiple pipes.
- if ((state.state & kMultiAwaitDrain) !== 0) {
- state.awaitDrainWriters.clear()
- } else {
- state.awaitDrainWriters = null
- }
- state.dataEmitted = true
- stream.emit('data', chunk)
- } else {
- // Update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length
- if (addToFront) state.buffer.unshift(chunk)
- else state.buffer.push(chunk)
- if ((state.state & kNeedReadable) !== 0) emitReadable(stream)
- }
- maybeReadMore(stream, state)
- }
- Readable.prototype.isPaused = function () {
- const state = this._readableState
- return state[kPaused] === true || state.flowing === false
- }
- // Backwards compatibility.
- Readable.prototype.setEncoding = function (enc) {
- const decoder = new StringDecoder(enc)
- this._readableState.decoder = decoder
- // If setEncoding(null), decoder.encoding equals utf8.
- this._readableState.encoding = this._readableState.decoder.encoding
- const buffer = this._readableState.buffer
- // Iterate over current buffer to convert already stored Buffers:
- let content = ''
- for (const data of buffer) {
- content += decoder.write(data)
- }
- buffer.clear()
- if (content !== '') buffer.push(content)
- this._readableState.length = content.length
- return this
- }
- // Don't raise the hwm > 1GB.
- const MAX_HWM = 0x40000000
- function computeNewHighWaterMark(n) {
- if (n > MAX_HWM) {
- throw new ERR_OUT_OF_RANGE('size', '<= 1GiB', n)
- } else {
- // Get the next highest power of 2 to prevent increasing hwm excessively in
- // tiny amounts.
- n--
- n |= n >>> 1
- n |= n >>> 2
- n |= n >>> 4
- n |= n >>> 8
- n |= n >>> 16
- n++
- }
- return n
- }
- // This function is designed to be inlinable, so please take care when making
- // changes to the function body.
- function howMuchToRead(n, state) {
- if (n <= 0 || (state.length === 0 && state.ended)) return 0
- if ((state.state & kObjectMode) !== 0) return 1
- if (NumberIsNaN(n)) {
- // Only flow one buffer at a time.
- if (state.flowing && state.length) return state.buffer.first().length
- return state.length
- }
- if (n <= state.length) return n
- return state.ended ? state.length : 0
- }
- // You can override either this method, or the async _read(n) below.
- Readable.prototype.read = function (n) {
- debug('read', n)
- // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
- // in this scenario, so we are doing it manually.
- if (n === undefined) {
- n = NaN
- } else if (!NumberIsInteger(n)) {
- n = NumberParseInt(n, 10)
- }
- const state = this._readableState
- const nOrig = n
- // If we're asking for more than the current hwm, then raise the hwm.
- if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n)
- if (n !== 0) state.state &= ~kEmittedReadable
- // If we're doing read(0) to trigger a readable event, but we
- // already have a bunch of data in the buffer, then just trigger
- // the 'readable' event and move on.
- if (
- n === 0 &&
- state.needReadable &&
- ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || state.ended)
- ) {
- debug('read: emitReadable', state.length, state.ended)
- if (state.length === 0 && state.ended) endReadable(this)
- else emitReadable(this)
- return null
- }
- n = howMuchToRead(n, state)
- // If we've ended, and we're now clear, then finish it up.
- if (n === 0 && state.ended) {
- if (state.length === 0) endReadable(this)
- return null
- }
- // All the actual chunk generation logic needs to be
- // *below* the call to _read. The reason is that in certain
- // synthetic stream cases, such as passthrough streams, _read
- // may be a completely synchronous operation which may change
- // the state of the read buffer, providing enough data when
- // before there was *not* enough.
- //
- // So, the steps are:
- // 1. Figure out what the state of things will be after we do
- // a read from the buffer.
- //
- // 2. If that resulting state will trigger a _read, then call _read.
- // Note that this may be asynchronous, or synchronous. Yes, it is
- // deeply ugly to write APIs this way, but that still doesn't mean
- // that the Readable class should behave improperly, as streams are
- // designed to be sync/async agnostic.
- // Take note if the _read call is sync or async (ie, if the read call
- // has returned yet), so that we know whether or not it's safe to emit
- // 'readable' etc.
- //
- // 3. Actually pull the requested chunks out of the buffer and return.
- // if we need a readable event, then we need to do some reading.
- let doRead = (state.state & kNeedReadable) !== 0
- debug('need readable', doRead)
- // If we currently have less than the highWaterMark, then also read some.
- if (state.length === 0 || state.length - n < state.highWaterMark) {
- doRead = true
- debug('length less than watermark', doRead)
- }
- // However, if we've ended, then there's no point, if we're already
- // reading, then it's unnecessary, if we're constructing we have to wait,
- // and if we're destroyed or errored, then it's not allowed,
- if (state.ended || state.reading || state.destroyed || state.errored || !state.constructed) {
- doRead = false
- debug('reading, ended or constructing', doRead)
- } else if (doRead) {
- debug('do read')
- state.state |= kReading | kSync
- // If the length is currently zero, then we *need* a readable event.
- if (state.length === 0) state.state |= kNeedReadable
- // Call internal read method
- try {
- this._read(state.highWaterMark)
- } catch (err) {
- errorOrDestroy(this, err)
- }
- state.state &= ~kSync
- // If _read pushed data synchronously, then `reading` will be false,
- // and we need to re-evaluate how much data we can return to the user.
- if (!state.reading) n = howMuchToRead(nOrig, state)
- }
- let ret
- if (n > 0) ret = fromList(n, state)
- else ret = null
- if (ret === null) {
- state.needReadable = state.length <= state.highWaterMark
- n = 0
- } else {
- state.length -= n
- if (state.multiAwaitDrain) {
- state.awaitDrainWriters.clear()
- } else {
- state.awaitDrainWriters = null
- }
- }
- if (state.length === 0) {
- // If we have nothing in the buffer, then we want to know
- // as soon as we *do* get something into the buffer.
- if (!state.ended) state.needReadable = true
- // If we tried to read() past the EOF, then emit end on the next tick.
- if (nOrig !== n && state.ended) endReadable(this)
- }
- if (ret !== null && !state.errorEmitted && !state.closeEmitted) {
- state.dataEmitted = true
- this.emit('data', ret)
- }
- return ret
- }
- function onEofChunk(stream, state) {
- debug('onEofChunk')
- if (state.ended) return
- if (state.decoder) {
- const chunk = state.decoder.end()
- if (chunk && chunk.length) {
- state.buffer.push(chunk)
- state.length += state.objectMode ? 1 : chunk.length
- }
- }
- state.ended = true
- if (state.sync) {
- // If we are sync, wait until next tick to emit the data.
- // Otherwise we risk emitting data in the flow()
- // the readable code triggers during a read() call.
- emitReadable(stream)
- } else {
- // Emit 'readable' now to make sure it gets picked up.
- state.needReadable = false
- state.emittedReadable = true
- // We have to emit readable now that we are EOF. Modules
- // in the ecosystem (e.g. dicer) rely on this event being sync.
- emitReadable_(stream)
- }
- }
- // Don't emit readable right away in sync mode, because this can trigger
- // another read() call => stack overflow. This way, it might trigger
- // a nextTick recursion warning, but that's not so bad.
- function emitReadable(stream) {
- const state = stream._readableState
- debug('emitReadable', state.needReadable, state.emittedReadable)
- state.needReadable = false
- if (!state.emittedReadable) {
- debug('emitReadable', state.flowing)
- state.emittedReadable = true
- process.nextTick(emitReadable_, stream)
- }
- }
- function emitReadable_(stream) {
- const state = stream._readableState
- debug('emitReadable_', state.destroyed, state.length, state.ended)
- if (!state.destroyed && !state.errored && (state.length || state.ended)) {
- stream.emit('readable')
- state.emittedReadable = false
- }
- // The stream needs another readable event if:
- // 1. It is not flowing, as the flow mechanism will take
- // care of it.
- // 2. It is not ended.
- // 3. It is below the highWaterMark, so we can schedule
- // another readable later.
- state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark
- flow(stream)
- }
- // At this point, the user has presumably seen the 'readable' event,
- // and called read() to consume some data. that may have triggered
- // in turn another _read(n) call, in which case reading = true if
- // it's in progress.
- // However, if we're not ended, or reading, and the length < hwm,
- // then go ahead and try to read some more preemptively.
- function maybeReadMore(stream, state) {
- if (!state.readingMore && state.constructed) {
- state.readingMore = true
- process.nextTick(maybeReadMore_, stream, state)
- }
- }
- function maybeReadMore_(stream, state) {
- // Attempt to read more data if we should.
- //
- // The conditions for reading more data are (one of):
- // - Not enough data buffered (state.length < state.highWaterMark). The loop
- // is responsible for filling the buffer with enough data if such data
- // is available. If highWaterMark is 0 and we are not in the flowing mode
- // we should _not_ attempt to buffer any extra data. We'll get more data
- // when the stream consumer calls read() instead.
- // - No data in the buffer, and the stream is in flowing mode. In this mode
- // the loop below is responsible for ensuring read() is called. Failing to
- // call read here would abort the flow and there's no other mechanism for
- // continuing the flow if the stream consumer has just subscribed to the
- // 'data' event.
- //
- // In addition to the above conditions to keep reading data, the following
- // conditions prevent the data from being read:
- // - The stream has ended (state.ended).
- // - There is already a pending 'read' operation (state.reading). This is a
- // case where the stream has called the implementation defined _read()
- // method, but they are processing the call asynchronously and have _not_
- // called push() with new data. In this case we skip performing more
- // read()s. The execution ends in this method again after the _read() ends
- // up calling push() with more data.
- while (
- !state.reading &&
- !state.ended &&
- (state.length < state.highWaterMark || (state.flowing && state.length === 0))
- ) {
- const len = state.length
- debug('maybeReadMore read 0')
- stream.read(0)
- if (len === state.length)
- // Didn't get any data, stop spinning.
- break
- }
- state.readingMore = false
- }
- // Abstract method. to be overridden in specific implementation classes.
- // call cb(er, data) where data is <= n in length.
- // for virtual (non-string, non-buffer) streams, "length" is somewhat
- // arbitrary, and perhaps not very meaningful.
- Readable.prototype._read = function (n) {
- throw new ERR_METHOD_NOT_IMPLEMENTED('_read()')
- }
- Readable.prototype.pipe = function (dest, pipeOpts) {
- const src = this
- const state = this._readableState
- if (state.pipes.length === 1) {
- if (!state.multiAwaitDrain) {
- state.multiAwaitDrain = true
- state.awaitDrainWriters = new SafeSet(state.awaitDrainWriters ? [state.awaitDrainWriters] : [])
- }
- }
- state.pipes.push(dest)
- debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts)
- const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr
- const endFn = doEnd ? onend : unpipe
- if (state.endEmitted) process.nextTick(endFn)
- else src.once('end', endFn)
- dest.on('unpipe', onunpipe)
- function onunpipe(readable, unpipeInfo) {
- debug('onunpipe')
- if (readable === src) {
- if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
- unpipeInfo.hasUnpiped = true
- cleanup()
- }
- }
- }
- function onend() {
- debug('onend')
- dest.end()
- }
- let ondrain
- let cleanedUp = false
- function cleanup() {
- debug('cleanup')
- // Cleanup event handlers once the pipe is broken.
- dest.removeListener('close', onclose)
- dest.removeListener('finish', onfinish)
- if (ondrain) {
- dest.removeListener('drain', ondrain)
- }
- dest.removeListener('error', onerror)
- dest.removeListener('unpipe', onunpipe)
- src.removeListener('end', onend)
- src.removeListener('end', unpipe)
- src.removeListener('data', ondata)
- cleanedUp = true
- // If the reader is waiting for a drain event from this
- // specific writer, then it would cause it to never start
- // flowing again.
- // So, if this is awaiting a drain, then we just call it now.
- // If we don't know, then assume that we are waiting for one.
- if (ondrain && state.awaitDrainWriters && (!dest._writableState || dest._writableState.needDrain)) ondrain()
- }
- function pause() {
- // If the user unpiped during `dest.write()`, it is possible
- // to get stuck in a permanently paused state if that write
- // also returned false.
- // => Check whether `dest` is still a piping destination.
- if (!cleanedUp) {
- if (state.pipes.length === 1 && state.pipes[0] === dest) {
- debug('false write response, pause', 0)
- state.awaitDrainWriters = dest
- state.multiAwaitDrain = false
- } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
- debug('false write response, pause', state.awaitDrainWriters.size)
- state.awaitDrainWriters.add(dest)
- }
- src.pause()
- }
- if (!ondrain) {
- // When the dest drains, it reduces the awaitDrain counter
- // on the source. This would be more elegant with a .once()
- // handler in flow(), but adding and removing repeatedly is
- // too slow.
- ondrain = pipeOnDrain(src, dest)
- dest.on('drain', ondrain)
- }
- }
- src.on('data', ondata)
- function ondata(chunk) {
- debug('ondata')
- const ret = dest.write(chunk)
- debug('dest.write', ret)
- if (ret === false) {
- pause()
- }
- }
- // If the dest has an error, then stop piping into it.
- // However, don't suppress the throwing behavior for this.
- function onerror(er) {
- debug('onerror', er)
- unpipe()
- dest.removeListener('error', onerror)
- if (dest.listenerCount('error') === 0) {
- const s = dest._writableState || dest._readableState
- if (s && !s.errorEmitted) {
- // User incorrectly emitted 'error' directly on the stream.
- errorOrDestroy(dest, er)
- } else {
- dest.emit('error', er)
- }
- }
- }
- // Make sure our error handler is attached before userland ones.
- prependListener(dest, 'error', onerror)
- // Both close and finish should trigger unpipe, but only once.
- function onclose() {
- dest.removeListener('finish', onfinish)
- unpipe()
- }
- dest.once('close', onclose)
- function onfinish() {
- debug('onfinish')
- dest.removeListener('close', onclose)
- unpipe()
- }
- dest.once('finish', onfinish)
- function unpipe() {
- debug('unpipe')
- src.unpipe(dest)
- }
- // Tell the dest that it's being piped to.
- dest.emit('pipe', src)
- // Start the flow if it hasn't been started already.
- if (dest.writableNeedDrain === true) {
- pause()
- } else if (!state.flowing) {
- debug('pipe resume')
- src.resume()
- }
- return dest
- }
- function pipeOnDrain(src, dest) {
- return function pipeOnDrainFunctionResult() {
- const state = src._readableState
- // `ondrain` will call directly,
- // `this` maybe not a reference to dest,
- // so we use the real dest here.
- if (state.awaitDrainWriters === dest) {
- debug('pipeOnDrain', 1)
- state.awaitDrainWriters = null
- } else if (state.multiAwaitDrain) {
- debug('pipeOnDrain', state.awaitDrainWriters.size)
- state.awaitDrainWriters.delete(dest)
- }
- if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && src.listenerCount('data')) {
- src.resume()
- }
- }
- }
- Readable.prototype.unpipe = function (dest) {
- const state = this._readableState
- const unpipeInfo = {
- hasUnpiped: false
- }
- // If we're not piping anywhere, then do nothing.
- if (state.pipes.length === 0) return this
- if (!dest) {
- // remove all.
- const dests = state.pipes
- state.pipes = []
- this.pause()
- for (let i = 0; i < dests.length; i++)
- dests[i].emit('unpipe', this, {
- hasUnpiped: false
- })
- return this
- }
- // Try to find the right one.
- const index = ArrayPrototypeIndexOf(state.pipes, dest)
- if (index === -1) return this
- state.pipes.splice(index, 1)
- if (state.pipes.length === 0) this.pause()
- dest.emit('unpipe', this, unpipeInfo)
- return this
- }
- // Set up data events if they are asked for
- // Ensure readable listeners eventually get something.
- Readable.prototype.on = function (ev, fn) {
- const res = Stream.prototype.on.call(this, ev, fn)
- const state = this._readableState
- if (ev === 'data') {
- // Update readableListening so that resume() may be a no-op
- // a few lines down. This is needed to support once('readable').
- state.readableListening = this.listenerCount('readable') > 0
- // Try start flowing on next tick if stream isn't explicitly paused.
- if (state.flowing !== false) this.resume()
- } else if (ev === 'readable') {
- if (!state.endEmitted && !state.readableListening) {
- state.readableListening = state.needReadable = true
- state.flowing = false
- state.emittedReadable = false
- debug('on readable', state.length, state.reading)
- if (state.length) {
- emitReadable(this)
- } else if (!state.reading) {
- process.nextTick(nReadingNextTick, this)
- }
- }
- }
- return res
- }
- Readable.prototype.addListener = Readable.prototype.on
- Readable.prototype.removeListener = function (ev, fn) {
- const res = Stream.prototype.removeListener.call(this, ev, fn)
- if (ev === 'readable') {
- // We need to check if there is someone still listening to
- // readable and reset the state. However this needs to happen
- // after readable has been emitted but before I/O (nextTick) to
- // support once('readable', fn) cycles. This means that calling
- // resume within the same tick will have no
- // effect.
- process.nextTick(updateReadableListening, this)
- }
- return res
- }
- Readable.prototype.off = Readable.prototype.removeListener
- Readable.prototype.removeAllListeners = function (ev) {
- const res = Stream.prototype.removeAllListeners.apply(this, arguments)
- if (ev === 'readable' || ev === undefined) {
- // We need to check if there is someone still listening to
- // readable and reset the state. However this needs to happen
- // after readable has been emitted but before I/O (nextTick) to
- // support once('readable', fn) cycles. This means that calling
- // resume within the same tick will have no
- // effect.
- process.nextTick(updateReadableListening, this)
- }
- return res
- }
- function updateReadableListening(self) {
- const state = self._readableState
- state.readableListening = self.listenerCount('readable') > 0
- if (state.resumeScheduled && state[kPaused] === false) {
- // Flowing needs to be set to true now, otherwise
- // the upcoming resume will not flow.
- state.flowing = true
- // Crude way to check if we should resume.
- } else if (self.listenerCount('data') > 0) {
- self.resume()
- } else if (!state.readableListening) {
- state.flowing = null
- }
- }
- function nReadingNextTick(self) {
- debug('readable nexttick read 0')
- self.read(0)
- }
- // pause() and resume() are remnants of the legacy readable stream API
- // If the user uses them, then switch into old mode.
- Readable.prototype.resume = function () {
- const state = this._readableState
- if (!state.flowing) {
- debug('resume')
- // We flow only if there is no one listening
- // for readable, but we still have to call
- // resume().
- state.flowing = !state.readableListening
- resume(this, state)
- }
- state[kPaused] = false
- return this
- }
- function resume(stream, state) {
- if (!state.resumeScheduled) {
- state.resumeScheduled = true
- process.nextTick(resume_, stream, state)
- }
- }
- function resume_(stream, state) {
- debug('resume', state.reading)
- if (!state.reading) {
- stream.read(0)
- }
- state.resumeScheduled = false
- stream.emit('resume')
- flow(stream)
- if (state.flowing && !state.reading) stream.read(0)
- }
- Readable.prototype.pause = function () {
- debug('call pause flowing=%j', this._readableState.flowing)
- if (this._readableState.flowing !== false) {
- debug('pause')
- this._readableState.flowing = false
- this.emit('pause')
- }
- this._readableState[kPaused] = true
- return this
- }
- function flow(stream) {
- const state = stream._readableState
- debug('flow', state.flowing)
- while (state.flowing && stream.read() !== null);
- }
- // Wrap an old-style stream as the async data source.
- // This is *not* part of the readable stream interface.
- // It is an ugly unfortunate mess of history.
- Readable.prototype.wrap = function (stream) {
- let paused = false
- // TODO (ronag): Should this.destroy(err) emit
- // 'error' on the wrapped stream? Would require
- // a static factory method, e.g. Readable.wrap(stream).
- stream.on('data', (chunk) => {
- if (!this.push(chunk) && stream.pause) {
- paused = true
- stream.pause()
- }
- })
- stream.on('end', () => {
- this.push(null)
- })
- stream.on('error', (err) => {
- errorOrDestroy(this, err)
- })
- stream.on('close', () => {
- this.destroy()
- })
- stream.on('destroy', () => {
- this.destroy()
- })
- this._read = () => {
- if (paused && stream.resume) {
- paused = false
- stream.resume()
- }
- }
- // Proxy all the other methods. Important when wrapping filters and duplexes.
- const streamKeys = ObjectKeys(stream)
- for (let j = 1; j < streamKeys.length; j++) {
- const i = streamKeys[j]
- if (this[i] === undefined && typeof stream[i] === 'function') {
- this[i] = stream[i].bind(stream)
- }
- }
- return this
- }
- Readable.prototype[SymbolAsyncIterator] = function () {
- return streamToAsyncIterator(this)
- }
- Readable.prototype.iterator = function (options) {
- if (options !== undefined) {
- validateObject(options, 'options')
- }
- return streamToAsyncIterator(this, options)
- }
- function streamToAsyncIterator(stream, options) {
- if (typeof stream.read !== 'function') {
- stream = Readable.wrap(stream, {
- objectMode: true
- })
- }
- const iter = createAsyncIterator(stream, options)
- iter.stream = stream
- return iter
- }
- async function* createAsyncIterator(stream, options) {
- let callback = nop
- function next(resolve) {
- if (this === stream) {
- callback()
- callback = nop
- } else {
- callback = resolve
- }
- }
- stream.on('readable', next)
- let error
- const cleanup = eos(
- stream,
- {
- writable: false
- },
- (err) => {
- error = err ? aggregateTwoErrors(error, err) : null
- callback()
- callback = nop
- }
- )
- try {
- while (true) {
- const chunk = stream.destroyed ? null : stream.read()
- if (chunk !== null) {
- yield chunk
- } else if (error) {
- throw error
- } else if (error === null) {
- return
- } else {
- await new Promise(next)
- }
- }
- } catch (err) {
- error = aggregateTwoErrors(error, err)
- throw error
- } finally {
- if (
- (error || (options === null || options === undefined ? undefined : options.destroyOnReturn) !== false) &&
- (error === undefined || stream._readableState.autoDestroy)
- ) {
- destroyImpl.destroyer(stream, null)
- } else {
- stream.off('readable', next)
- cleanup()
- }
- }
- }
- // Making it explicit these properties are not enumerable
- // because otherwise some prototype manipulation in
- // userland will fail.
- ObjectDefineProperties(Readable.prototype, {
- readable: {
- __proto__: null,
- get() {
- const r = this._readableState
- // r.readable === false means that this is part of a Duplex stream
- // where the readable side was disabled upon construction.
- // Compat. The user might manually disable readable side through
- // deprecated setter.
- return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted && !r.endEmitted
- },
- set(val) {
- // Backwards compat.
- if (this._readableState) {
- this._readableState.readable = !!val
- }
- }
- },
- readableDidRead: {
- __proto__: null,
- enumerable: false,
- get: function () {
- return this._readableState.dataEmitted
- }
- },
- readableAborted: {
- __proto__: null,
- enumerable: false,
- get: function () {
- return !!(
- this._readableState.readable !== false &&
- (this._readableState.destroyed || this._readableState.errored) &&
- !this._readableState.endEmitted
- )
- }
- },
- readableHighWaterMark: {
- __proto__: null,
- enumerable: false,
- get: function () {
- return this._readableState.highWaterMark
- }
- },
- readableBuffer: {
- __proto__: null,
- enumerable: false,
- get: function () {
- return this._readableState && this._readableState.buffer
- }
- },
- readableFlowing: {
- __proto__: null,
- enumerable: false,
- get: function () {
- return this._readableState.flowing
- },
- set: function (state) {
- if (this._readableState) {
- this._readableState.flowing = state
- }
- }
- },
- readableLength: {
- __proto__: null,
- enumerable: false,
- get() {
- return this._readableState.length
- }
- },
- readableObjectMode: {
- __proto__: null,
- enumerable: false,
- get() {
- return this._readableState ? this._readableState.objectMode : false
- }
- },
- readableEncoding: {
- __proto__: null,
- enumerable: false,
- get() {
- return this._readableState ? this._readableState.encoding : null
- }
- },
- errored: {
- __proto__: null,
- enumerable: false,
- get() {
- return this._readableState ? this._readableState.errored : null
- }
- },
- closed: {
- __proto__: null,
- get() {
- return this._readableState ? this._readableState.closed : false
- }
- },
- destroyed: {
- __proto__: null,
- enumerable: false,
- get() {
- return this._readableState ? this._readableState.destroyed : false
- },
- set(value) {
- // We ignore the value if the stream
- // has not been initialized yet.
- if (!this._readableState) {
- return
- }
- // Backward compatibility, the user is explicitly
- // managing destroyed.
- this._readableState.destroyed = value
- }
- },
- readableEnded: {
- __proto__: null,
- enumerable: false,
- get() {
- return this._readableState ? this._readableState.endEmitted : false
- }
- }
- })
- ObjectDefineProperties(ReadableState.prototype, {
- // Legacy getter for `pipesCount`.
- pipesCount: {
- __proto__: null,
- get() {
- return this.pipes.length
- }
- },
- // Legacy property for `paused`.
- paused: {
- __proto__: null,
- get() {
- return this[kPaused] !== false
- },
- set(value) {
- this[kPaused] = !!value
- }
- }
- })
- // Exposed for testing purposes only.
- Readable._fromList = fromList
- // Pluck off n bytes from an array of buffers.
- // Length is the combined lengths of all the buffers in the list.
- // This function is designed to be inlinable, so please take care when making
- // changes to the function body.
- function fromList(n, state) {
- // nothing buffered.
- if (state.length === 0) return null
- let ret
- if (state.objectMode) ret = state.buffer.shift()
- else if (!n || n >= state.length) {
- // Read it all, truncate the list.
- if (state.decoder) ret = state.buffer.join('')
- else if (state.buffer.length === 1) ret = state.buffer.first()
- else ret = state.buffer.concat(state.length)
- state.buffer.clear()
- } else {
- // read part of list.
- ret = state.buffer.consume(n, state.decoder)
- }
- return ret
- }
- function endReadable(stream) {
- const state = stream._readableState
- debug('endReadable', state.endEmitted)
- if (!state.endEmitted) {
- state.ended = true
- process.nextTick(endReadableNT, state, stream)
- }
- }
- function endReadableNT(state, stream) {
- debug('endReadableNT', state.endEmitted, state.length)
- // Check that we didn't get one last unshift.
- if (!state.errored && !state.closeEmitted && !state.endEmitted && state.length === 0) {
- state.endEmitted = true
- stream.emit('end')
- if (stream.writable && stream.allowHalfOpen === false) {
- process.nextTick(endWritableNT, stream)
- } else if (state.autoDestroy) {
- // In case of duplex streams we need a way to detect
- // if the writable side is ready for autoDestroy as well.
- const wState = stream._writableState
- const autoDestroy =
- !wState ||
- (wState.autoDestroy &&
- // We don't expect the writable to ever 'finish'
- // if writable is explicitly set to false.
- (wState.finished || wState.writable === false))
- if (autoDestroy) {
- stream.destroy()
- }
- }
- }
- }
- function endWritableNT(stream) {
- const writable = stream.writable && !stream.writableEnded && !stream.destroyed
- if (writable) {
- stream.end()
- }
- }
- Readable.from = function (iterable, opts) {
- return from(Readable, iterable, opts)
- }
- let webStreamsAdapters
- // Lazy to avoid circular references
- function lazyWebStreams() {
- if (webStreamsAdapters === undefined) webStreamsAdapters = {}
- return webStreamsAdapters
- }
- Readable.fromWeb = function (readableStream, options) {
- return lazyWebStreams().newStreamReadableFromReadableStream(readableStream, options)
- }
- Readable.toWeb = function (streamReadable, options) {
- return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable, options)
- }
- Readable.wrap = function (src, options) {
- var _ref, _src$readableObjectMo
- return new Readable({
- objectMode:
- (_ref =
- (_src$readableObjectMo = src.readableObjectMode) !== null && _src$readableObjectMo !== undefined
- ? _src$readableObjectMo
- : src.objectMode) !== null && _ref !== undefined
- ? _ref
- : true,
- ...options,
- destroy(err, callback) {
- destroyImpl.destroyer(src, err)
- callback(err)
- }
- }).wrap(src)
- }
|