| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- 'use strict'
- /* replacement start */
- const process = require('process/')
- /* replacement end */
- const {
- aggregateTwoErrors,
- codes: { ERR_MULTIPLE_CALLBACK },
- AbortError
- } = require('../../ours/errors')
- const { Symbol } = require('../../ours/primordials')
- const { kIsDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils')
- const kDestroy = Symbol('kDestroy')
- const kConstruct = Symbol('kConstruct')
- function checkError(err, w, r) {
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack // eslint-disable-line no-unused-expressions
- if (w && !w.errored) {
- w.errored = err
- }
- if (r && !r.errored) {
- r.errored = err
- }
- }
- }
- // Backwards compat. cb() is undocumented and unused in core but
- // unfortunately might be used by modules.
- function destroy(err, cb) {
- const r = this._readableState
- const w = this._writableState
- // With duplex streams we use the writable side for state.
- const s = w || r
- if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) {
- if (typeof cb === 'function') {
- cb()
- }
- return this
- }
- // We set destroyed to true before firing error callbacks in order
- // to make it re-entrance safe in case destroy() is called within callbacks
- checkError(err, w, r)
- if (w) {
- w.destroyed = true
- }
- if (r) {
- r.destroyed = true
- }
- // If still constructing then defer calling _destroy.
- if (!s.constructed) {
- this.once(kDestroy, function (er) {
- _destroy(this, aggregateTwoErrors(er, err), cb)
- })
- } else {
- _destroy(this, err, cb)
- }
- return this
- }
- function _destroy(self, err, cb) {
- let called = false
- function onDestroy(err) {
- if (called) {
- return
- }
- called = true
- const r = self._readableState
- const w = self._writableState
- checkError(err, w, r)
- if (w) {
- w.closed = true
- }
- if (r) {
- r.closed = true
- }
- if (typeof cb === 'function') {
- cb(err)
- }
- if (err) {
- process.nextTick(emitErrorCloseNT, self, err)
- } else {
- process.nextTick(emitCloseNT, self)
- }
- }
- try {
- self._destroy(err || null, onDestroy)
- } catch (err) {
- onDestroy(err)
- }
- }
- function emitErrorCloseNT(self, err) {
- emitErrorNT(self, err)
- emitCloseNT(self)
- }
- function emitCloseNT(self) {
- const r = self._readableState
- const w = self._writableState
- if (w) {
- w.closeEmitted = true
- }
- if (r) {
- r.closeEmitted = true
- }
- if ((w !== null && w !== undefined && w.emitClose) || (r !== null && r !== undefined && r.emitClose)) {
- self.emit('close')
- }
- }
- function emitErrorNT(self, err) {
- const r = self._readableState
- const w = self._writableState
- if ((w !== null && w !== undefined && w.errorEmitted) || (r !== null && r !== undefined && r.errorEmitted)) {
- return
- }
- if (w) {
- w.errorEmitted = true
- }
- if (r) {
- r.errorEmitted = true
- }
- self.emit('error', err)
- }
- function undestroy() {
- const r = this._readableState
- const w = this._writableState
- if (r) {
- r.constructed = true
- r.closed = false
- r.closeEmitted = false
- r.destroyed = false
- r.errored = null
- r.errorEmitted = false
- r.reading = false
- r.ended = r.readable === false
- r.endEmitted = r.readable === false
- }
- if (w) {
- w.constructed = true
- w.destroyed = false
- w.closed = false
- w.closeEmitted = false
- w.errored = null
- w.errorEmitted = false
- w.finalCalled = false
- w.prefinished = false
- w.ended = w.writable === false
- w.ending = w.writable === false
- w.finished = w.writable === false
- }
- }
- function errorOrDestroy(stream, err, sync) {
- // We have tests that rely on errors being emitted
- // in the same tick, so changing this is semver major.
- // For now when you opt-in to autoDestroy we allow
- // the error to be emitted nextTick. In a future
- // semver major update we should change the default to this.
- const r = stream._readableState
- const w = stream._writableState
- if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) {
- return this
- }
- if ((r !== null && r !== undefined && r.autoDestroy) || (w !== null && w !== undefined && w.autoDestroy))
- stream.destroy(err)
- else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack // eslint-disable-line no-unused-expressions
- if (w && !w.errored) {
- w.errored = err
- }
- if (r && !r.errored) {
- r.errored = err
- }
- if (sync) {
- process.nextTick(emitErrorNT, stream, err)
- } else {
- emitErrorNT(stream, err)
- }
- }
- }
- function construct(stream, cb) {
- if (typeof stream._construct !== 'function') {
- return
- }
- const r = stream._readableState
- const w = stream._writableState
- if (r) {
- r.constructed = false
- }
- if (w) {
- w.constructed = false
- }
- stream.once(kConstruct, cb)
- if (stream.listenerCount(kConstruct) > 1) {
- // Duplex
- return
- }
- process.nextTick(constructNT, stream)
- }
- function constructNT(stream) {
- let called = false
- function onConstruct(err) {
- if (called) {
- errorOrDestroy(stream, err !== null && err !== undefined ? err : new ERR_MULTIPLE_CALLBACK())
- return
- }
- called = true
- const r = stream._readableState
- const w = stream._writableState
- const s = w || r
- if (r) {
- r.constructed = true
- }
- if (w) {
- w.constructed = true
- }
- if (s.destroyed) {
- stream.emit(kDestroy, err)
- } else if (err) {
- errorOrDestroy(stream, err, true)
- } else {
- process.nextTick(emitConstructNT, stream)
- }
- }
- try {
- stream._construct((err) => {
- process.nextTick(onConstruct, err)
- })
- } catch (err) {
- process.nextTick(onConstruct, err)
- }
- }
- function emitConstructNT(stream) {
- stream.emit(kConstruct)
- }
- function isRequest(stream) {
- return (stream === null || stream === undefined ? undefined : stream.setHeader) && typeof stream.abort === 'function'
- }
- function emitCloseLegacy(stream) {
- stream.emit('close')
- }
- function emitErrorCloseLegacy(stream, err) {
- stream.emit('error', err)
- process.nextTick(emitCloseLegacy, stream)
- }
- // Normalize destroy for legacy.
- function destroyer(stream, err) {
- if (!stream || isDestroyed(stream)) {
- return
- }
- if (!err && !isFinished(stream)) {
- err = new AbortError()
- }
- // TODO: Remove isRequest branches.
- if (isServerRequest(stream)) {
- stream.socket = null
- stream.destroy(err)
- } else if (isRequest(stream)) {
- stream.abort()
- } else if (isRequest(stream.req)) {
- stream.req.abort()
- } else if (typeof stream.destroy === 'function') {
- stream.destroy(err)
- } else if (typeof stream.close === 'function') {
- // TODO: Don't lose err?
- stream.close()
- } else if (err) {
- process.nextTick(emitErrorCloseLegacy, stream, err)
- } else {
- process.nextTick(emitCloseLegacy, stream)
- }
- if (!stream.destroyed) {
- stream[kIsDestroyed] = true
- }
- }
- module.exports = {
- construct,
- destroyer,
- destroy,
- undestroy,
- errorOrDestroy
- }
|