readable.js 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290
  1. // Copyright Joyent, Inc. and other Node contributors.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a
  4. // copy of this software and associated documentation files (the
  5. // "Software"), to deal in the Software without restriction, including
  6. // without limitation the rights to use, copy, modify, merge, publish,
  7. // distribute, sublicense, and/or sell copies of the Software, and to permit
  8. // persons to whom the Software is furnished to do so, subject to the
  9. // following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included
  12. // in all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  15. // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  16. // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
  17. // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
  18. // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
  19. // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
  20. // USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. 'use strict'
  22. /* replacement start */
  23. const process = require('process/')
  24. /* replacement end */
  25. const {
  26. ArrayPrototypeIndexOf,
  27. NumberIsInteger,
  28. NumberIsNaN,
  29. NumberParseInt,
  30. ObjectDefineProperties,
  31. ObjectKeys,
  32. ObjectSetPrototypeOf,
  33. Promise,
  34. SafeSet,
  35. SymbolAsyncDispose,
  36. SymbolAsyncIterator,
  37. Symbol
  38. } = require('../../ours/primordials')
  39. module.exports = Readable
  40. Readable.ReadableState = ReadableState
  41. const { EventEmitter: EE } = require('events')
  42. const { Stream, prependListener } = require('./legacy')
  43. const { Buffer } = require('buffer')
  44. const { addAbortSignal } = require('./add-abort-signal')
  45. const eos = require('./end-of-stream')
  46. let debug = require('../../ours/util').debuglog('stream', (fn) => {
  47. debug = fn
  48. })
  49. const BufferList = require('./buffer_list')
  50. const destroyImpl = require('./destroy')
  51. const { getHighWaterMark, getDefaultHighWaterMark } = require('./state')
  52. const {
  53. aggregateTwoErrors,
  54. codes: {
  55. ERR_INVALID_ARG_TYPE,
  56. ERR_METHOD_NOT_IMPLEMENTED,
  57. ERR_OUT_OF_RANGE,
  58. ERR_STREAM_PUSH_AFTER_EOF,
  59. ERR_STREAM_UNSHIFT_AFTER_END_EVENT
  60. },
  61. AbortError
  62. } = require('../../ours/errors')
  63. const { validateObject } = require('../validators')
  64. const kPaused = Symbol('kPaused')
  65. const { StringDecoder } = require('string_decoder/')
  66. const from = require('./from')
  67. ObjectSetPrototypeOf(Readable.prototype, Stream.prototype)
  68. ObjectSetPrototypeOf(Readable, Stream)
  69. const nop = () => {}
  70. const { errorOrDestroy } = destroyImpl
  71. const kObjectMode = 1 << 0
  72. const kEnded = 1 << 1
  73. const kEndEmitted = 1 << 2
  74. const kReading = 1 << 3
  75. const kConstructed = 1 << 4
  76. const kSync = 1 << 5
  77. const kNeedReadable = 1 << 6
  78. const kEmittedReadable = 1 << 7
  79. const kReadableListening = 1 << 8
  80. const kResumeScheduled = 1 << 9
  81. const kErrorEmitted = 1 << 10
  82. const kEmitClose = 1 << 11
  83. const kAutoDestroy = 1 << 12
  84. const kDestroyed = 1 << 13
  85. const kClosed = 1 << 14
  86. const kCloseEmitted = 1 << 15
  87. const kMultiAwaitDrain = 1 << 16
  88. const kReadingMore = 1 << 17
  89. const kDataEmitted = 1 << 18
  90. // TODO(benjamingr) it is likely slower to do it this way than with free functions
  91. function makeBitMapDescriptor(bit) {
  92. return {
  93. enumerable: false,
  94. get() {
  95. return (this.state & bit) !== 0
  96. },
  97. set(value) {
  98. if (value) this.state |= bit
  99. else this.state &= ~bit
  100. }
  101. }
  102. }
  103. ObjectDefineProperties(ReadableState.prototype, {
  104. objectMode: makeBitMapDescriptor(kObjectMode),
  105. ended: makeBitMapDescriptor(kEnded),
  106. endEmitted: makeBitMapDescriptor(kEndEmitted),
  107. reading: makeBitMapDescriptor(kReading),
  108. // Stream is still being constructed and cannot be
  109. // destroyed until construction finished or failed.
  110. // Async construction is opt in, therefore we start as
  111. // constructed.
  112. constructed: makeBitMapDescriptor(kConstructed),
  113. // A flag to be able to tell if the event 'readable'/'data' is emitted
  114. // immediately, or on a later tick. We set this to true at first, because
  115. // any actions that shouldn't happen until "later" should generally also
  116. // not happen before the first read call.
  117. sync: makeBitMapDescriptor(kSync),
  118. // Whenever we return null, then we set a flag to say
  119. // that we're awaiting a 'readable' event emission.
  120. needReadable: makeBitMapDescriptor(kNeedReadable),
  121. emittedReadable: makeBitMapDescriptor(kEmittedReadable),
  122. readableListening: makeBitMapDescriptor(kReadableListening),
  123. resumeScheduled: makeBitMapDescriptor(kResumeScheduled),
  124. // True if the error was already emitted and should not be thrown again.
  125. errorEmitted: makeBitMapDescriptor(kErrorEmitted),
  126. emitClose: makeBitMapDescriptor(kEmitClose),
  127. autoDestroy: makeBitMapDescriptor(kAutoDestroy),
  128. // Has it been destroyed.
  129. destroyed: makeBitMapDescriptor(kDestroyed),
  130. // Indicates whether the stream has finished destroying.
  131. closed: makeBitMapDescriptor(kClosed),
  132. // True if close has been emitted or would have been emitted
  133. // depending on emitClose.
  134. closeEmitted: makeBitMapDescriptor(kCloseEmitted),
  135. multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain),
  136. // If true, a maybeReadMore has been scheduled.
  137. readingMore: makeBitMapDescriptor(kReadingMore),
  138. dataEmitted: makeBitMapDescriptor(kDataEmitted)
  139. })
  140. function ReadableState(options, stream, isDuplex) {
  141. // Duplex streams are both readable and writable, but share
  142. // the same options object.
  143. // However, some cases require setting options to different
  144. // values for the readable and the writable sides of the duplex stream.
  145. // These options can be provided separately as readableXXX and writableXXX.
  146. if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex')
  147. // Bit map field to store ReadableState more effciently with 1 bit per field
  148. // instead of a V8 slot per field.
  149. this.state = kEmitClose | kAutoDestroy | kConstructed | kSync
  150. // Object stream flag. Used to make read(n) ignore n and to
  151. // make all the buffer merging and length checks go away.
  152. if (options && options.objectMode) this.state |= kObjectMode
  153. if (isDuplex && options && options.readableObjectMode) this.state |= kObjectMode
  154. // The point at which it stops calling _read() to fill the buffer
  155. // Note: 0 is a valid value, means "don't call _read preemptively ever"
  156. this.highWaterMark = options
  157. ? getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex)
  158. : getDefaultHighWaterMark(false)
  159. // A linked list is used to store data chunks instead of an array because the
  160. // linked list can remove elements from the beginning faster than
  161. // array.shift().
  162. this.buffer = new BufferList()
  163. this.length = 0
  164. this.pipes = []
  165. this.flowing = null
  166. this[kPaused] = null
  167. // Should close be emitted on destroy. Defaults to true.
  168. if (options && options.emitClose === false) this.state &= ~kEmitClose
  169. // Should .destroy() be called after 'end' (and potentially 'finish').
  170. if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy
  171. // Indicates whether the stream has errored. When true no further
  172. // _read calls, 'data' or 'readable' events should occur. This is needed
  173. // since when autoDestroy is disabled we need a way to tell whether the
  174. // stream has failed.
  175. this.errored = null
  176. // Crypto is kind of old and crusty. Historically, its default string
  177. // encoding is 'binary' so we have to make this configurable.
  178. // Everything else in the universe uses 'utf8', though.
  179. this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'
  180. // Ref the piped dest which we need a drain event on it
  181. // type: null | Writable | Set<Writable>.
  182. this.awaitDrainWriters = null
  183. this.decoder = null
  184. this.encoding = null
  185. if (options && options.encoding) {
  186. this.decoder = new StringDecoder(options.encoding)
  187. this.encoding = options.encoding
  188. }
  189. }
  190. function Readable(options) {
  191. if (!(this instanceof Readable)) return new Readable(options)
  192. // Checking for a Stream.Duplex instance is faster here instead of inside
  193. // the ReadableState constructor, at least with V8 6.5.
  194. const isDuplex = this instanceof require('./duplex')
  195. this._readableState = new ReadableState(options, this, isDuplex)
  196. if (options) {
  197. if (typeof options.read === 'function') this._read = options.read
  198. if (typeof options.destroy === 'function') this._destroy = options.destroy
  199. if (typeof options.construct === 'function') this._construct = options.construct
  200. if (options.signal && !isDuplex) addAbortSignal(options.signal, this)
  201. }
  202. Stream.call(this, options)
  203. destroyImpl.construct(this, () => {
  204. if (this._readableState.needReadable) {
  205. maybeReadMore(this, this._readableState)
  206. }
  207. })
  208. }
  209. Readable.prototype.destroy = destroyImpl.destroy
  210. Readable.prototype._undestroy = destroyImpl.undestroy
  211. Readable.prototype._destroy = function (err, cb) {
  212. cb(err)
  213. }
  214. Readable.prototype[EE.captureRejectionSymbol] = function (err) {
  215. this.destroy(err)
  216. }
  217. Readable.prototype[SymbolAsyncDispose] = function () {
  218. let error
  219. if (!this.destroyed) {
  220. error = this.readableEnded ? null : new AbortError()
  221. this.destroy(error)
  222. }
  223. return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null))))
  224. }
  225. // Manually shove something into the read() buffer.
  226. // This returns true if the highWaterMark has not been hit yet,
  227. // similar to how Writable.write() returns true if you should
  228. // write() some more.
  229. Readable.prototype.push = function (chunk, encoding) {
  230. return readableAddChunk(this, chunk, encoding, false)
  231. }
  232. // Unshift should *always* be something directly out of read().
  233. Readable.prototype.unshift = function (chunk, encoding) {
  234. return readableAddChunk(this, chunk, encoding, true)
  235. }
  236. function readableAddChunk(stream, chunk, encoding, addToFront) {
  237. debug('readableAddChunk', chunk)
  238. const state = stream._readableState
  239. let err
  240. if ((state.state & kObjectMode) === 0) {
  241. if (typeof chunk === 'string') {
  242. encoding = encoding || state.defaultEncoding
  243. if (state.encoding !== encoding) {
  244. if (addToFront && state.encoding) {
  245. // When unshifting, if state.encoding is set, we have to save
  246. // the string in the BufferList with the state encoding.
  247. chunk = Buffer.from(chunk, encoding).toString(state.encoding)
  248. } else {
  249. chunk = Buffer.from(chunk, encoding)
  250. encoding = ''
  251. }
  252. }
  253. } else if (chunk instanceof Buffer) {
  254. encoding = ''
  255. } else if (Stream._isUint8Array(chunk)) {
  256. chunk = Stream._uint8ArrayToBuffer(chunk)
  257. encoding = ''
  258. } else if (chunk != null) {
  259. err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk)
  260. }
  261. }
  262. if (err) {
  263. errorOrDestroy(stream, err)
  264. } else if (chunk === null) {
  265. state.state &= ~kReading
  266. onEofChunk(stream, state)
  267. } else if ((state.state & kObjectMode) !== 0 || (chunk && chunk.length > 0)) {
  268. if (addToFront) {
  269. if ((state.state & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT())
  270. else if (state.destroyed || state.errored) return false
  271. else addChunk(stream, state, chunk, true)
  272. } else if (state.ended) {
  273. errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF())
  274. } else if (state.destroyed || state.errored) {
  275. return false
  276. } else {
  277. state.state &= ~kReading
  278. if (state.decoder && !encoding) {
  279. chunk = state.decoder.write(chunk)
  280. if (state.objectMode || chunk.length !== 0) addChunk(stream, state, chunk, false)
  281. else maybeReadMore(stream, state)
  282. } else {
  283. addChunk(stream, state, chunk, false)
  284. }
  285. }
  286. } else if (!addToFront) {
  287. state.state &= ~kReading
  288. maybeReadMore(stream, state)
  289. }
  290. // We can push more data if we are below the highWaterMark.
  291. // Also, if we have no data yet, we can stand some more bytes.
  292. // This is to work around cases where hwm=0, such as the repl.
  293. return !state.ended && (state.length < state.highWaterMark || state.length === 0)
  294. }
  295. function addChunk(stream, state, chunk, addToFront) {
  296. if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) {
  297. // Use the guard to avoid creating `Set()` repeatedly
  298. // when we have multiple pipes.
  299. if ((state.state & kMultiAwaitDrain) !== 0) {
  300. state.awaitDrainWriters.clear()
  301. } else {
  302. state.awaitDrainWriters = null
  303. }
  304. state.dataEmitted = true
  305. stream.emit('data', chunk)
  306. } else {
  307. // Update the buffer info.
  308. state.length += state.objectMode ? 1 : chunk.length
  309. if (addToFront) state.buffer.unshift(chunk)
  310. else state.buffer.push(chunk)
  311. if ((state.state & kNeedReadable) !== 0) emitReadable(stream)
  312. }
  313. maybeReadMore(stream, state)
  314. }
  315. Readable.prototype.isPaused = function () {
  316. const state = this._readableState
  317. return state[kPaused] === true || state.flowing === false
  318. }
  319. // Backwards compatibility.
  320. Readable.prototype.setEncoding = function (enc) {
  321. const decoder = new StringDecoder(enc)
  322. this._readableState.decoder = decoder
  323. // If setEncoding(null), decoder.encoding equals utf8.
  324. this._readableState.encoding = this._readableState.decoder.encoding
  325. const buffer = this._readableState.buffer
  326. // Iterate over current buffer to convert already stored Buffers:
  327. let content = ''
  328. for (const data of buffer) {
  329. content += decoder.write(data)
  330. }
  331. buffer.clear()
  332. if (content !== '') buffer.push(content)
  333. this._readableState.length = content.length
  334. return this
  335. }
  336. // Don't raise the hwm > 1GB.
  337. const MAX_HWM = 0x40000000
  338. function computeNewHighWaterMark(n) {
  339. if (n > MAX_HWM) {
  340. throw new ERR_OUT_OF_RANGE('size', '<= 1GiB', n)
  341. } else {
  342. // Get the next highest power of 2 to prevent increasing hwm excessively in
  343. // tiny amounts.
  344. n--
  345. n |= n >>> 1
  346. n |= n >>> 2
  347. n |= n >>> 4
  348. n |= n >>> 8
  349. n |= n >>> 16
  350. n++
  351. }
  352. return n
  353. }
  354. // This function is designed to be inlinable, so please take care when making
  355. // changes to the function body.
  356. function howMuchToRead(n, state) {
  357. if (n <= 0 || (state.length === 0 && state.ended)) return 0
  358. if ((state.state & kObjectMode) !== 0) return 1
  359. if (NumberIsNaN(n)) {
  360. // Only flow one buffer at a time.
  361. if (state.flowing && state.length) return state.buffer.first().length
  362. return state.length
  363. }
  364. if (n <= state.length) return n
  365. return state.ended ? state.length : 0
  366. }
  367. // You can override either this method, or the async _read(n) below.
  368. Readable.prototype.read = function (n) {
  369. debug('read', n)
  370. // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
  371. // in this scenario, so we are doing it manually.
  372. if (n === undefined) {
  373. n = NaN
  374. } else if (!NumberIsInteger(n)) {
  375. n = NumberParseInt(n, 10)
  376. }
  377. const state = this._readableState
  378. const nOrig = n
  379. // If we're asking for more than the current hwm, then raise the hwm.
  380. if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n)
  381. if (n !== 0) state.state &= ~kEmittedReadable
  382. // If we're doing read(0) to trigger a readable event, but we
  383. // already have a bunch of data in the buffer, then just trigger
  384. // the 'readable' event and move on.
  385. if (
  386. n === 0 &&
  387. state.needReadable &&
  388. ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || state.ended)
  389. ) {
  390. debug('read: emitReadable', state.length, state.ended)
  391. if (state.length === 0 && state.ended) endReadable(this)
  392. else emitReadable(this)
  393. return null
  394. }
  395. n = howMuchToRead(n, state)
  396. // If we've ended, and we're now clear, then finish it up.
  397. if (n === 0 && state.ended) {
  398. if (state.length === 0) endReadable(this)
  399. return null
  400. }
  401. // All the actual chunk generation logic needs to be
  402. // *below* the call to _read. The reason is that in certain
  403. // synthetic stream cases, such as passthrough streams, _read
  404. // may be a completely synchronous operation which may change
  405. // the state of the read buffer, providing enough data when
  406. // before there was *not* enough.
  407. //
  408. // So, the steps are:
  409. // 1. Figure out what the state of things will be after we do
  410. // a read from the buffer.
  411. //
  412. // 2. If that resulting state will trigger a _read, then call _read.
  413. // Note that this may be asynchronous, or synchronous. Yes, it is
  414. // deeply ugly to write APIs this way, but that still doesn't mean
  415. // that the Readable class should behave improperly, as streams are
  416. // designed to be sync/async agnostic.
  417. // Take note if the _read call is sync or async (ie, if the read call
  418. // has returned yet), so that we know whether or not it's safe to emit
  419. // 'readable' etc.
  420. //
  421. // 3. Actually pull the requested chunks out of the buffer and return.
  422. // if we need a readable event, then we need to do some reading.
  423. let doRead = (state.state & kNeedReadable) !== 0
  424. debug('need readable', doRead)
  425. // If we currently have less than the highWaterMark, then also read some.
  426. if (state.length === 0 || state.length - n < state.highWaterMark) {
  427. doRead = true
  428. debug('length less than watermark', doRead)
  429. }
  430. // However, if we've ended, then there's no point, if we're already
  431. // reading, then it's unnecessary, if we're constructing we have to wait,
  432. // and if we're destroyed or errored, then it's not allowed,
  433. if (state.ended || state.reading || state.destroyed || state.errored || !state.constructed) {
  434. doRead = false
  435. debug('reading, ended or constructing', doRead)
  436. } else if (doRead) {
  437. debug('do read')
  438. state.state |= kReading | kSync
  439. // If the length is currently zero, then we *need* a readable event.
  440. if (state.length === 0) state.state |= kNeedReadable
  441. // Call internal read method
  442. try {
  443. this._read(state.highWaterMark)
  444. } catch (err) {
  445. errorOrDestroy(this, err)
  446. }
  447. state.state &= ~kSync
  448. // If _read pushed data synchronously, then `reading` will be false,
  449. // and we need to re-evaluate how much data we can return to the user.
  450. if (!state.reading) n = howMuchToRead(nOrig, state)
  451. }
  452. let ret
  453. if (n > 0) ret = fromList(n, state)
  454. else ret = null
  455. if (ret === null) {
  456. state.needReadable = state.length <= state.highWaterMark
  457. n = 0
  458. } else {
  459. state.length -= n
  460. if (state.multiAwaitDrain) {
  461. state.awaitDrainWriters.clear()
  462. } else {
  463. state.awaitDrainWriters = null
  464. }
  465. }
  466. if (state.length === 0) {
  467. // If we have nothing in the buffer, then we want to know
  468. // as soon as we *do* get something into the buffer.
  469. if (!state.ended) state.needReadable = true
  470. // If we tried to read() past the EOF, then emit end on the next tick.
  471. if (nOrig !== n && state.ended) endReadable(this)
  472. }
  473. if (ret !== null && !state.errorEmitted && !state.closeEmitted) {
  474. state.dataEmitted = true
  475. this.emit('data', ret)
  476. }
  477. return ret
  478. }
  479. function onEofChunk(stream, state) {
  480. debug('onEofChunk')
  481. if (state.ended) return
  482. if (state.decoder) {
  483. const chunk = state.decoder.end()
  484. if (chunk && chunk.length) {
  485. state.buffer.push(chunk)
  486. state.length += state.objectMode ? 1 : chunk.length
  487. }
  488. }
  489. state.ended = true
  490. if (state.sync) {
  491. // If we are sync, wait until next tick to emit the data.
  492. // Otherwise we risk emitting data in the flow()
  493. // the readable code triggers during a read() call.
  494. emitReadable(stream)
  495. } else {
  496. // Emit 'readable' now to make sure it gets picked up.
  497. state.needReadable = false
  498. state.emittedReadable = true
  499. // We have to emit readable now that we are EOF. Modules
  500. // in the ecosystem (e.g. dicer) rely on this event being sync.
  501. emitReadable_(stream)
  502. }
  503. }
  504. // Don't emit readable right away in sync mode, because this can trigger
  505. // another read() call => stack overflow. This way, it might trigger
  506. // a nextTick recursion warning, but that's not so bad.
  507. function emitReadable(stream) {
  508. const state = stream._readableState
  509. debug('emitReadable', state.needReadable, state.emittedReadable)
  510. state.needReadable = false
  511. if (!state.emittedReadable) {
  512. debug('emitReadable', state.flowing)
  513. state.emittedReadable = true
  514. process.nextTick(emitReadable_, stream)
  515. }
  516. }
  517. function emitReadable_(stream) {
  518. const state = stream._readableState
  519. debug('emitReadable_', state.destroyed, state.length, state.ended)
  520. if (!state.destroyed && !state.errored && (state.length || state.ended)) {
  521. stream.emit('readable')
  522. state.emittedReadable = false
  523. }
  524. // The stream needs another readable event if:
  525. // 1. It is not flowing, as the flow mechanism will take
  526. // care of it.
  527. // 2. It is not ended.
  528. // 3. It is below the highWaterMark, so we can schedule
  529. // another readable later.
  530. state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark
  531. flow(stream)
  532. }
  533. // At this point, the user has presumably seen the 'readable' event,
  534. // and called read() to consume some data. that may have triggered
  535. // in turn another _read(n) call, in which case reading = true if
  536. // it's in progress.
  537. // However, if we're not ended, or reading, and the length < hwm,
  538. // then go ahead and try to read some more preemptively.
  539. function maybeReadMore(stream, state) {
  540. if (!state.readingMore && state.constructed) {
  541. state.readingMore = true
  542. process.nextTick(maybeReadMore_, stream, state)
  543. }
  544. }
  545. function maybeReadMore_(stream, state) {
  546. // Attempt to read more data if we should.
  547. //
  548. // The conditions for reading more data are (one of):
  549. // - Not enough data buffered (state.length < state.highWaterMark). The loop
  550. // is responsible for filling the buffer with enough data if such data
  551. // is available. If highWaterMark is 0 and we are not in the flowing mode
  552. // we should _not_ attempt to buffer any extra data. We'll get more data
  553. // when the stream consumer calls read() instead.
  554. // - No data in the buffer, and the stream is in flowing mode. In this mode
  555. // the loop below is responsible for ensuring read() is called. Failing to
  556. // call read here would abort the flow and there's no other mechanism for
  557. // continuing the flow if the stream consumer has just subscribed to the
  558. // 'data' event.
  559. //
  560. // In addition to the above conditions to keep reading data, the following
  561. // conditions prevent the data from being read:
  562. // - The stream has ended (state.ended).
  563. // - There is already a pending 'read' operation (state.reading). This is a
  564. // case where the stream has called the implementation defined _read()
  565. // method, but they are processing the call asynchronously and have _not_
  566. // called push() with new data. In this case we skip performing more
  567. // read()s. The execution ends in this method again after the _read() ends
  568. // up calling push() with more data.
  569. while (
  570. !state.reading &&
  571. !state.ended &&
  572. (state.length < state.highWaterMark || (state.flowing && state.length === 0))
  573. ) {
  574. const len = state.length
  575. debug('maybeReadMore read 0')
  576. stream.read(0)
  577. if (len === state.length)
  578. // Didn't get any data, stop spinning.
  579. break
  580. }
  581. state.readingMore = false
  582. }
  583. // Abstract method. to be overridden in specific implementation classes.
  584. // call cb(er, data) where data is <= n in length.
  585. // for virtual (non-string, non-buffer) streams, "length" is somewhat
  586. // arbitrary, and perhaps not very meaningful.
  587. Readable.prototype._read = function (n) {
  588. throw new ERR_METHOD_NOT_IMPLEMENTED('_read()')
  589. }
  590. Readable.prototype.pipe = function (dest, pipeOpts) {
  591. const src = this
  592. const state = this._readableState
  593. if (state.pipes.length === 1) {
  594. if (!state.multiAwaitDrain) {
  595. state.multiAwaitDrain = true
  596. state.awaitDrainWriters = new SafeSet(state.awaitDrainWriters ? [state.awaitDrainWriters] : [])
  597. }
  598. }
  599. state.pipes.push(dest)
  600. debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts)
  601. const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr
  602. const endFn = doEnd ? onend : unpipe
  603. if (state.endEmitted) process.nextTick(endFn)
  604. else src.once('end', endFn)
  605. dest.on('unpipe', onunpipe)
  606. function onunpipe(readable, unpipeInfo) {
  607. debug('onunpipe')
  608. if (readable === src) {
  609. if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
  610. unpipeInfo.hasUnpiped = true
  611. cleanup()
  612. }
  613. }
  614. }
  615. function onend() {
  616. debug('onend')
  617. dest.end()
  618. }
  619. let ondrain
  620. let cleanedUp = false
  621. function cleanup() {
  622. debug('cleanup')
  623. // Cleanup event handlers once the pipe is broken.
  624. dest.removeListener('close', onclose)
  625. dest.removeListener('finish', onfinish)
  626. if (ondrain) {
  627. dest.removeListener('drain', ondrain)
  628. }
  629. dest.removeListener('error', onerror)
  630. dest.removeListener('unpipe', onunpipe)
  631. src.removeListener('end', onend)
  632. src.removeListener('end', unpipe)
  633. src.removeListener('data', ondata)
  634. cleanedUp = true
  635. // If the reader is waiting for a drain event from this
  636. // specific writer, then it would cause it to never start
  637. // flowing again.
  638. // So, if this is awaiting a drain, then we just call it now.
  639. // If we don't know, then assume that we are waiting for one.
  640. if (ondrain && state.awaitDrainWriters && (!dest._writableState || dest._writableState.needDrain)) ondrain()
  641. }
  642. function pause() {
  643. // If the user unpiped during `dest.write()`, it is possible
  644. // to get stuck in a permanently paused state if that write
  645. // also returned false.
  646. // => Check whether `dest` is still a piping destination.
  647. if (!cleanedUp) {
  648. if (state.pipes.length === 1 && state.pipes[0] === dest) {
  649. debug('false write response, pause', 0)
  650. state.awaitDrainWriters = dest
  651. state.multiAwaitDrain = false
  652. } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
  653. debug('false write response, pause', state.awaitDrainWriters.size)
  654. state.awaitDrainWriters.add(dest)
  655. }
  656. src.pause()
  657. }
  658. if (!ondrain) {
  659. // When the dest drains, it reduces the awaitDrain counter
  660. // on the source. This would be more elegant with a .once()
  661. // handler in flow(), but adding and removing repeatedly is
  662. // too slow.
  663. ondrain = pipeOnDrain(src, dest)
  664. dest.on('drain', ondrain)
  665. }
  666. }
  667. src.on('data', ondata)
  668. function ondata(chunk) {
  669. debug('ondata')
  670. const ret = dest.write(chunk)
  671. debug('dest.write', ret)
  672. if (ret === false) {
  673. pause()
  674. }
  675. }
  676. // If the dest has an error, then stop piping into it.
  677. // However, don't suppress the throwing behavior for this.
  678. function onerror(er) {
  679. debug('onerror', er)
  680. unpipe()
  681. dest.removeListener('error', onerror)
  682. if (dest.listenerCount('error') === 0) {
  683. const s = dest._writableState || dest._readableState
  684. if (s && !s.errorEmitted) {
  685. // User incorrectly emitted 'error' directly on the stream.
  686. errorOrDestroy(dest, er)
  687. } else {
  688. dest.emit('error', er)
  689. }
  690. }
  691. }
  692. // Make sure our error handler is attached before userland ones.
  693. prependListener(dest, 'error', onerror)
  694. // Both close and finish should trigger unpipe, but only once.
  695. function onclose() {
  696. dest.removeListener('finish', onfinish)
  697. unpipe()
  698. }
  699. dest.once('close', onclose)
  700. function onfinish() {
  701. debug('onfinish')
  702. dest.removeListener('close', onclose)
  703. unpipe()
  704. }
  705. dest.once('finish', onfinish)
  706. function unpipe() {
  707. debug('unpipe')
  708. src.unpipe(dest)
  709. }
  710. // Tell the dest that it's being piped to.
  711. dest.emit('pipe', src)
  712. // Start the flow if it hasn't been started already.
  713. if (dest.writableNeedDrain === true) {
  714. pause()
  715. } else if (!state.flowing) {
  716. debug('pipe resume')
  717. src.resume()
  718. }
  719. return dest
  720. }
  721. function pipeOnDrain(src, dest) {
  722. return function pipeOnDrainFunctionResult() {
  723. const state = src._readableState
  724. // `ondrain` will call directly,
  725. // `this` maybe not a reference to dest,
  726. // so we use the real dest here.
  727. if (state.awaitDrainWriters === dest) {
  728. debug('pipeOnDrain', 1)
  729. state.awaitDrainWriters = null
  730. } else if (state.multiAwaitDrain) {
  731. debug('pipeOnDrain', state.awaitDrainWriters.size)
  732. state.awaitDrainWriters.delete(dest)
  733. }
  734. if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && src.listenerCount('data')) {
  735. src.resume()
  736. }
  737. }
  738. }
  739. Readable.prototype.unpipe = function (dest) {
  740. const state = this._readableState
  741. const unpipeInfo = {
  742. hasUnpiped: false
  743. }
  744. // If we're not piping anywhere, then do nothing.
  745. if (state.pipes.length === 0) return this
  746. if (!dest) {
  747. // remove all.
  748. const dests = state.pipes
  749. state.pipes = []
  750. this.pause()
  751. for (let i = 0; i < dests.length; i++)
  752. dests[i].emit('unpipe', this, {
  753. hasUnpiped: false
  754. })
  755. return this
  756. }
  757. // Try to find the right one.
  758. const index = ArrayPrototypeIndexOf(state.pipes, dest)
  759. if (index === -1) return this
  760. state.pipes.splice(index, 1)
  761. if (state.pipes.length === 0) this.pause()
  762. dest.emit('unpipe', this, unpipeInfo)
  763. return this
  764. }
  765. // Set up data events if they are asked for
  766. // Ensure readable listeners eventually get something.
  767. Readable.prototype.on = function (ev, fn) {
  768. const res = Stream.prototype.on.call(this, ev, fn)
  769. const state = this._readableState
  770. if (ev === 'data') {
  771. // Update readableListening so that resume() may be a no-op
  772. // a few lines down. This is needed to support once('readable').
  773. state.readableListening = this.listenerCount('readable') > 0
  774. // Try start flowing on next tick if stream isn't explicitly paused.
  775. if (state.flowing !== false) this.resume()
  776. } else if (ev === 'readable') {
  777. if (!state.endEmitted && !state.readableListening) {
  778. state.readableListening = state.needReadable = true
  779. state.flowing = false
  780. state.emittedReadable = false
  781. debug('on readable', state.length, state.reading)
  782. if (state.length) {
  783. emitReadable(this)
  784. } else if (!state.reading) {
  785. process.nextTick(nReadingNextTick, this)
  786. }
  787. }
  788. }
  789. return res
  790. }
  791. Readable.prototype.addListener = Readable.prototype.on
  792. Readable.prototype.removeListener = function (ev, fn) {
  793. const res = Stream.prototype.removeListener.call(this, ev, fn)
  794. if (ev === 'readable') {
  795. // We need to check if there is someone still listening to
  796. // readable and reset the state. However this needs to happen
  797. // after readable has been emitted but before I/O (nextTick) to
  798. // support once('readable', fn) cycles. This means that calling
  799. // resume within the same tick will have no
  800. // effect.
  801. process.nextTick(updateReadableListening, this)
  802. }
  803. return res
  804. }
  805. Readable.prototype.off = Readable.prototype.removeListener
  806. Readable.prototype.removeAllListeners = function (ev) {
  807. const res = Stream.prototype.removeAllListeners.apply(this, arguments)
  808. if (ev === 'readable' || ev === undefined) {
  809. // We need to check if there is someone still listening to
  810. // readable and reset the state. However this needs to happen
  811. // after readable has been emitted but before I/O (nextTick) to
  812. // support once('readable', fn) cycles. This means that calling
  813. // resume within the same tick will have no
  814. // effect.
  815. process.nextTick(updateReadableListening, this)
  816. }
  817. return res
  818. }
  819. function updateReadableListening(self) {
  820. const state = self._readableState
  821. state.readableListening = self.listenerCount('readable') > 0
  822. if (state.resumeScheduled && state[kPaused] === false) {
  823. // Flowing needs to be set to true now, otherwise
  824. // the upcoming resume will not flow.
  825. state.flowing = true
  826. // Crude way to check if we should resume.
  827. } else if (self.listenerCount('data') > 0) {
  828. self.resume()
  829. } else if (!state.readableListening) {
  830. state.flowing = null
  831. }
  832. }
  833. function nReadingNextTick(self) {
  834. debug('readable nexttick read 0')
  835. self.read(0)
  836. }
  837. // pause() and resume() are remnants of the legacy readable stream API
  838. // If the user uses them, then switch into old mode.
  839. Readable.prototype.resume = function () {
  840. const state = this._readableState
  841. if (!state.flowing) {
  842. debug('resume')
  843. // We flow only if there is no one listening
  844. // for readable, but we still have to call
  845. // resume().
  846. state.flowing = !state.readableListening
  847. resume(this, state)
  848. }
  849. state[kPaused] = false
  850. return this
  851. }
  852. function resume(stream, state) {
  853. if (!state.resumeScheduled) {
  854. state.resumeScheduled = true
  855. process.nextTick(resume_, stream, state)
  856. }
  857. }
  858. function resume_(stream, state) {
  859. debug('resume', state.reading)
  860. if (!state.reading) {
  861. stream.read(0)
  862. }
  863. state.resumeScheduled = false
  864. stream.emit('resume')
  865. flow(stream)
  866. if (state.flowing && !state.reading) stream.read(0)
  867. }
  868. Readable.prototype.pause = function () {
  869. debug('call pause flowing=%j', this._readableState.flowing)
  870. if (this._readableState.flowing !== false) {
  871. debug('pause')
  872. this._readableState.flowing = false
  873. this.emit('pause')
  874. }
  875. this._readableState[kPaused] = true
  876. return this
  877. }
  878. function flow(stream) {
  879. const state = stream._readableState
  880. debug('flow', state.flowing)
  881. while (state.flowing && stream.read() !== null);
  882. }
  883. // Wrap an old-style stream as the async data source.
  884. // This is *not* part of the readable stream interface.
  885. // It is an ugly unfortunate mess of history.
  886. Readable.prototype.wrap = function (stream) {
  887. let paused = false
  888. // TODO (ronag): Should this.destroy(err) emit
  889. // 'error' on the wrapped stream? Would require
  890. // a static factory method, e.g. Readable.wrap(stream).
  891. stream.on('data', (chunk) => {
  892. if (!this.push(chunk) && stream.pause) {
  893. paused = true
  894. stream.pause()
  895. }
  896. })
  897. stream.on('end', () => {
  898. this.push(null)
  899. })
  900. stream.on('error', (err) => {
  901. errorOrDestroy(this, err)
  902. })
  903. stream.on('close', () => {
  904. this.destroy()
  905. })
  906. stream.on('destroy', () => {
  907. this.destroy()
  908. })
  909. this._read = () => {
  910. if (paused && stream.resume) {
  911. paused = false
  912. stream.resume()
  913. }
  914. }
  915. // Proxy all the other methods. Important when wrapping filters and duplexes.
  916. const streamKeys = ObjectKeys(stream)
  917. for (let j = 1; j < streamKeys.length; j++) {
  918. const i = streamKeys[j]
  919. if (this[i] === undefined && typeof stream[i] === 'function') {
  920. this[i] = stream[i].bind(stream)
  921. }
  922. }
  923. return this
  924. }
  925. Readable.prototype[SymbolAsyncIterator] = function () {
  926. return streamToAsyncIterator(this)
  927. }
  928. Readable.prototype.iterator = function (options) {
  929. if (options !== undefined) {
  930. validateObject(options, 'options')
  931. }
  932. return streamToAsyncIterator(this, options)
  933. }
  934. function streamToAsyncIterator(stream, options) {
  935. if (typeof stream.read !== 'function') {
  936. stream = Readable.wrap(stream, {
  937. objectMode: true
  938. })
  939. }
  940. const iter = createAsyncIterator(stream, options)
  941. iter.stream = stream
  942. return iter
  943. }
  944. async function* createAsyncIterator(stream, options) {
  945. let callback = nop
  946. function next(resolve) {
  947. if (this === stream) {
  948. callback()
  949. callback = nop
  950. } else {
  951. callback = resolve
  952. }
  953. }
  954. stream.on('readable', next)
  955. let error
  956. const cleanup = eos(
  957. stream,
  958. {
  959. writable: false
  960. },
  961. (err) => {
  962. error = err ? aggregateTwoErrors(error, err) : null
  963. callback()
  964. callback = nop
  965. }
  966. )
  967. try {
  968. while (true) {
  969. const chunk = stream.destroyed ? null : stream.read()
  970. if (chunk !== null) {
  971. yield chunk
  972. } else if (error) {
  973. throw error
  974. } else if (error === null) {
  975. return
  976. } else {
  977. await new Promise(next)
  978. }
  979. }
  980. } catch (err) {
  981. error = aggregateTwoErrors(error, err)
  982. throw error
  983. } finally {
  984. if (
  985. (error || (options === null || options === undefined ? undefined : options.destroyOnReturn) !== false) &&
  986. (error === undefined || stream._readableState.autoDestroy)
  987. ) {
  988. destroyImpl.destroyer(stream, null)
  989. } else {
  990. stream.off('readable', next)
  991. cleanup()
  992. }
  993. }
  994. }
  995. // Making it explicit these properties are not enumerable
  996. // because otherwise some prototype manipulation in
  997. // userland will fail.
  998. ObjectDefineProperties(Readable.prototype, {
  999. readable: {
  1000. __proto__: null,
  1001. get() {
  1002. const r = this._readableState
  1003. // r.readable === false means that this is part of a Duplex stream
  1004. // where the readable side was disabled upon construction.
  1005. // Compat. The user might manually disable readable side through
  1006. // deprecated setter.
  1007. return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted && !r.endEmitted
  1008. },
  1009. set(val) {
  1010. // Backwards compat.
  1011. if (this._readableState) {
  1012. this._readableState.readable = !!val
  1013. }
  1014. }
  1015. },
  1016. readableDidRead: {
  1017. __proto__: null,
  1018. enumerable: false,
  1019. get: function () {
  1020. return this._readableState.dataEmitted
  1021. }
  1022. },
  1023. readableAborted: {
  1024. __proto__: null,
  1025. enumerable: false,
  1026. get: function () {
  1027. return !!(
  1028. this._readableState.readable !== false &&
  1029. (this._readableState.destroyed || this._readableState.errored) &&
  1030. !this._readableState.endEmitted
  1031. )
  1032. }
  1033. },
  1034. readableHighWaterMark: {
  1035. __proto__: null,
  1036. enumerable: false,
  1037. get: function () {
  1038. return this._readableState.highWaterMark
  1039. }
  1040. },
  1041. readableBuffer: {
  1042. __proto__: null,
  1043. enumerable: false,
  1044. get: function () {
  1045. return this._readableState && this._readableState.buffer
  1046. }
  1047. },
  1048. readableFlowing: {
  1049. __proto__: null,
  1050. enumerable: false,
  1051. get: function () {
  1052. return this._readableState.flowing
  1053. },
  1054. set: function (state) {
  1055. if (this._readableState) {
  1056. this._readableState.flowing = state
  1057. }
  1058. }
  1059. },
  1060. readableLength: {
  1061. __proto__: null,
  1062. enumerable: false,
  1063. get() {
  1064. return this._readableState.length
  1065. }
  1066. },
  1067. readableObjectMode: {
  1068. __proto__: null,
  1069. enumerable: false,
  1070. get() {
  1071. return this._readableState ? this._readableState.objectMode : false
  1072. }
  1073. },
  1074. readableEncoding: {
  1075. __proto__: null,
  1076. enumerable: false,
  1077. get() {
  1078. return this._readableState ? this._readableState.encoding : null
  1079. }
  1080. },
  1081. errored: {
  1082. __proto__: null,
  1083. enumerable: false,
  1084. get() {
  1085. return this._readableState ? this._readableState.errored : null
  1086. }
  1087. },
  1088. closed: {
  1089. __proto__: null,
  1090. get() {
  1091. return this._readableState ? this._readableState.closed : false
  1092. }
  1093. },
  1094. destroyed: {
  1095. __proto__: null,
  1096. enumerable: false,
  1097. get() {
  1098. return this._readableState ? this._readableState.destroyed : false
  1099. },
  1100. set(value) {
  1101. // We ignore the value if the stream
  1102. // has not been initialized yet.
  1103. if (!this._readableState) {
  1104. return
  1105. }
  1106. // Backward compatibility, the user is explicitly
  1107. // managing destroyed.
  1108. this._readableState.destroyed = value
  1109. }
  1110. },
  1111. readableEnded: {
  1112. __proto__: null,
  1113. enumerable: false,
  1114. get() {
  1115. return this._readableState ? this._readableState.endEmitted : false
  1116. }
  1117. }
  1118. })
  1119. ObjectDefineProperties(ReadableState.prototype, {
  1120. // Legacy getter for `pipesCount`.
  1121. pipesCount: {
  1122. __proto__: null,
  1123. get() {
  1124. return this.pipes.length
  1125. }
  1126. },
  1127. // Legacy property for `paused`.
  1128. paused: {
  1129. __proto__: null,
  1130. get() {
  1131. return this[kPaused] !== false
  1132. },
  1133. set(value) {
  1134. this[kPaused] = !!value
  1135. }
  1136. }
  1137. })
  1138. // Exposed for testing purposes only.
  1139. Readable._fromList = fromList
  1140. // Pluck off n bytes from an array of buffers.
  1141. // Length is the combined lengths of all the buffers in the list.
  1142. // This function is designed to be inlinable, so please take care when making
  1143. // changes to the function body.
  1144. function fromList(n, state) {
  1145. // nothing buffered.
  1146. if (state.length === 0) return null
  1147. let ret
  1148. if (state.objectMode) ret = state.buffer.shift()
  1149. else if (!n || n >= state.length) {
  1150. // Read it all, truncate the list.
  1151. if (state.decoder) ret = state.buffer.join('')
  1152. else if (state.buffer.length === 1) ret = state.buffer.first()
  1153. else ret = state.buffer.concat(state.length)
  1154. state.buffer.clear()
  1155. } else {
  1156. // read part of list.
  1157. ret = state.buffer.consume(n, state.decoder)
  1158. }
  1159. return ret
  1160. }
  1161. function endReadable(stream) {
  1162. const state = stream._readableState
  1163. debug('endReadable', state.endEmitted)
  1164. if (!state.endEmitted) {
  1165. state.ended = true
  1166. process.nextTick(endReadableNT, state, stream)
  1167. }
  1168. }
  1169. function endReadableNT(state, stream) {
  1170. debug('endReadableNT', state.endEmitted, state.length)
  1171. // Check that we didn't get one last unshift.
  1172. if (!state.errored && !state.closeEmitted && !state.endEmitted && state.length === 0) {
  1173. state.endEmitted = true
  1174. stream.emit('end')
  1175. if (stream.writable && stream.allowHalfOpen === false) {
  1176. process.nextTick(endWritableNT, stream)
  1177. } else if (state.autoDestroy) {
  1178. // In case of duplex streams we need a way to detect
  1179. // if the writable side is ready for autoDestroy as well.
  1180. const wState = stream._writableState
  1181. const autoDestroy =
  1182. !wState ||
  1183. (wState.autoDestroy &&
  1184. // We don't expect the writable to ever 'finish'
  1185. // if writable is explicitly set to false.
  1186. (wState.finished || wState.writable === false))
  1187. if (autoDestroy) {
  1188. stream.destroy()
  1189. }
  1190. }
  1191. }
  1192. }
  1193. function endWritableNT(stream) {
  1194. const writable = stream.writable && !stream.writableEnded && !stream.destroyed
  1195. if (writable) {
  1196. stream.end()
  1197. }
  1198. }
  1199. Readable.from = function (iterable, opts) {
  1200. return from(Readable, iterable, opts)
  1201. }
  1202. let webStreamsAdapters
  1203. // Lazy to avoid circular references
  1204. function lazyWebStreams() {
  1205. if (webStreamsAdapters === undefined) webStreamsAdapters = {}
  1206. return webStreamsAdapters
  1207. }
  1208. Readable.fromWeb = function (readableStream, options) {
  1209. return lazyWebStreams().newStreamReadableFromReadableStream(readableStream, options)
  1210. }
  1211. Readable.toWeb = function (streamReadable, options) {
  1212. return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable, options)
  1213. }
  1214. Readable.wrap = function (src, options) {
  1215. var _ref, _src$readableObjectMo
  1216. return new Readable({
  1217. objectMode:
  1218. (_ref =
  1219. (_src$readableObjectMo = src.readableObjectMode) !== null && _src$readableObjectMo !== undefined
  1220. ? _src$readableObjectMo
  1221. : src.objectMode) !== null && _ref !== undefined
  1222. ? _ref
  1223. : true,
  1224. ...options,
  1225. destroy(err, callback) {
  1226. destroyImpl.destroyer(src, err)
  1227. callback(err)
  1228. }
  1229. }).wrap(src)
  1230. }