| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- // Ported from https://github.com/mafintosh/end-of-stream with
- // permission from the author, Mathias Buus (@mafintosh).
- 'use strict'
- /* replacement start */
- const process = require('process/')
- /* replacement end */
- const { AbortError, codes } = require('../../ours/errors')
- const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes
- const { kEmptyObject, once } = require('../../ours/util')
- const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators')
- const { Promise, PromisePrototypeThen, SymbolDispose } = require('../../ours/primordials')
- const {
- isClosed,
- isReadable,
- isReadableNodeStream,
- isReadableStream,
- isReadableFinished,
- isReadableErrored,
- isWritable,
- isWritableNodeStream,
- isWritableStream,
- isWritableFinished,
- isWritableErrored,
- isNodeStream,
- willEmitClose: _willEmitClose,
- kIsClosedPromise
- } = require('./utils')
- let addAbortListener
- function isRequest(stream) {
- return stream.setHeader && typeof stream.abort === 'function'
- }
- const nop = () => {}
- function eos(stream, options, callback) {
- var _options$readable, _options$writable
- if (arguments.length === 2) {
- callback = options
- options = kEmptyObject
- } else if (options == null) {
- options = kEmptyObject
- } else {
- validateObject(options, 'options')
- }
- validateFunction(callback, 'callback')
- validateAbortSignal(options.signal, 'options.signal')
- callback = once(callback)
- if (isReadableStream(stream) || isWritableStream(stream)) {
- return eosWeb(stream, options, callback)
- }
- if (!isNodeStream(stream)) {
- throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream)
- }
- const readable =
- (_options$readable = options.readable) !== null && _options$readable !== undefined
- ? _options$readable
- : isReadableNodeStream(stream)
- const writable =
- (_options$writable = options.writable) !== null && _options$writable !== undefined
- ? _options$writable
- : isWritableNodeStream(stream)
- const wState = stream._writableState
- const rState = stream._readableState
- const onlegacyfinish = () => {
- if (!stream.writable) {
- onfinish()
- }
- }
- // TODO (ronag): Improve soft detection to include core modules and
- // common ecosystem modules that do properly emit 'close' but fail
- // this generic check.
- let willEmitClose =
- _willEmitClose(stream) && isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable
- let writableFinished = isWritableFinished(stream, false)
- const onfinish = () => {
- writableFinished = true
- // Stream should not be destroyed here. If it is that
- // means that user space is doing something differently and
- // we cannot trust willEmitClose.
- if (stream.destroyed) {
- willEmitClose = false
- }
- if (willEmitClose && (!stream.readable || readable)) {
- return
- }
- if (!readable || readableFinished) {
- callback.call(stream)
- }
- }
- let readableFinished = isReadableFinished(stream, false)
- const onend = () => {
- readableFinished = true
- // Stream should not be destroyed here. If it is that
- // means that user space is doing something differently and
- // we cannot trust willEmitClose.
- if (stream.destroyed) {
- willEmitClose = false
- }
- if (willEmitClose && (!stream.writable || writable)) {
- return
- }
- if (!writable || writableFinished) {
- callback.call(stream)
- }
- }
- const onerror = (err) => {
- callback.call(stream, err)
- }
- let closed = isClosed(stream)
- const onclose = () => {
- closed = true
- const errored = isWritableErrored(stream) || isReadableErrored(stream)
- if (errored && typeof errored !== 'boolean') {
- return callback.call(stream, errored)
- }
- if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
- if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
- }
- if (writable && !writableFinished) {
- if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
- }
- callback.call(stream)
- }
- const onclosed = () => {
- closed = true
- const errored = isWritableErrored(stream) || isReadableErrored(stream)
- if (errored && typeof errored !== 'boolean') {
- return callback.call(stream, errored)
- }
- callback.call(stream)
- }
- const onrequest = () => {
- stream.req.on('finish', onfinish)
- }
- if (isRequest(stream)) {
- stream.on('complete', onfinish)
- if (!willEmitClose) {
- stream.on('abort', onclose)
- }
- if (stream.req) {
- onrequest()
- } else {
- stream.on('request', onrequest)
- }
- } else if (writable && !wState) {
- // legacy streams
- stream.on('end', onlegacyfinish)
- stream.on('close', onlegacyfinish)
- }
- // Not all streams will emit 'close' after 'aborted'.
- if (!willEmitClose && typeof stream.aborted === 'boolean') {
- stream.on('aborted', onclose)
- }
- stream.on('end', onend)
- stream.on('finish', onfinish)
- if (options.error !== false) {
- stream.on('error', onerror)
- }
- stream.on('close', onclose)
- if (closed) {
- process.nextTick(onclose)
- } else if (
- (wState !== null && wState !== undefined && wState.errorEmitted) ||
- (rState !== null && rState !== undefined && rState.errorEmitted)
- ) {
- if (!willEmitClose) {
- process.nextTick(onclosed)
- }
- } else if (
- !readable &&
- (!willEmitClose || isReadable(stream)) &&
- (writableFinished || isWritable(stream) === false)
- ) {
- process.nextTick(onclosed)
- } else if (
- !writable &&
- (!willEmitClose || isWritable(stream)) &&
- (readableFinished || isReadable(stream) === false)
- ) {
- process.nextTick(onclosed)
- } else if (rState && stream.req && stream.aborted) {
- process.nextTick(onclosed)
- }
- const cleanup = () => {
- callback = nop
- stream.removeListener('aborted', onclose)
- stream.removeListener('complete', onfinish)
- stream.removeListener('abort', onclose)
- stream.removeListener('request', onrequest)
- if (stream.req) stream.req.removeListener('finish', onfinish)
- stream.removeListener('end', onlegacyfinish)
- stream.removeListener('close', onlegacyfinish)
- stream.removeListener('finish', onfinish)
- stream.removeListener('end', onend)
- stream.removeListener('error', onerror)
- stream.removeListener('close', onclose)
- }
- if (options.signal && !closed) {
- const abort = () => {
- // Keep it because cleanup removes it.
- const endCallback = callback
- cleanup()
- endCallback.call(
- stream,
- new AbortError(undefined, {
- cause: options.signal.reason
- })
- )
- }
- if (options.signal.aborted) {
- process.nextTick(abort)
- } else {
- addAbortListener = addAbortListener || require('../../ours/util').addAbortListener
- const disposable = addAbortListener(options.signal, abort)
- const originalCallback = callback
- callback = once((...args) => {
- disposable[SymbolDispose]()
- originalCallback.apply(stream, args)
- })
- }
- }
- return cleanup
- }
- function eosWeb(stream, options, callback) {
- let isAborted = false
- let abort = nop
- if (options.signal) {
- abort = () => {
- isAborted = true
- callback.call(
- stream,
- new AbortError(undefined, {
- cause: options.signal.reason
- })
- )
- }
- if (options.signal.aborted) {
- process.nextTick(abort)
- } else {
- addAbortListener = addAbortListener || require('../../ours/util').addAbortListener
- const disposable = addAbortListener(options.signal, abort)
- const originalCallback = callback
- callback = once((...args) => {
- disposable[SymbolDispose]()
- originalCallback.apply(stream, args)
- })
- }
- }
- const resolverFn = (...args) => {
- if (!isAborted) {
- process.nextTick(() => callback.apply(stream, args))
- }
- }
- PromisePrototypeThen(stream[kIsClosedPromise].promise, resolverFn, resolverFn)
- return nop
- }
- function finished(stream, opts) {
- var _opts
- let autoCleanup = false
- if (opts === null) {
- opts = kEmptyObject
- }
- if ((_opts = opts) !== null && _opts !== undefined && _opts.cleanup) {
- validateBoolean(opts.cleanup, 'cleanup')
- autoCleanup = opts.cleanup
- }
- return new Promise((resolve, reject) => {
- const cleanup = eos(stream, opts, (err) => {
- if (autoCleanup) {
- cleanup()
- }
- if (err) {
- reject(err)
- } else {
- resolve()
- }
- })
- })
- }
- module.exports = eos
- module.exports.finished = finished
|