end-of-stream.js 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. // Ported from https://github.com/mafintosh/end-of-stream with
  2. // permission from the author, Mathias Buus (@mafintosh).
  3. 'use strict'
  4. /* replacement start */
  5. const process = require('process/')
  6. /* replacement end */
  7. const { AbortError, codes } = require('../../ours/errors')
  8. const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes
  9. const { kEmptyObject, once } = require('../../ours/util')
  10. const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators')
  11. const { Promise, PromisePrototypeThen, SymbolDispose } = require('../../ours/primordials')
  12. const {
  13. isClosed,
  14. isReadable,
  15. isReadableNodeStream,
  16. isReadableStream,
  17. isReadableFinished,
  18. isReadableErrored,
  19. isWritable,
  20. isWritableNodeStream,
  21. isWritableStream,
  22. isWritableFinished,
  23. isWritableErrored,
  24. isNodeStream,
  25. willEmitClose: _willEmitClose,
  26. kIsClosedPromise
  27. } = require('./utils')
  28. let addAbortListener
  29. function isRequest(stream) {
  30. return stream.setHeader && typeof stream.abort === 'function'
  31. }
  32. const nop = () => {}
  33. function eos(stream, options, callback) {
  34. var _options$readable, _options$writable
  35. if (arguments.length === 2) {
  36. callback = options
  37. options = kEmptyObject
  38. } else if (options == null) {
  39. options = kEmptyObject
  40. } else {
  41. validateObject(options, 'options')
  42. }
  43. validateFunction(callback, 'callback')
  44. validateAbortSignal(options.signal, 'options.signal')
  45. callback = once(callback)
  46. if (isReadableStream(stream) || isWritableStream(stream)) {
  47. return eosWeb(stream, options, callback)
  48. }
  49. if (!isNodeStream(stream)) {
  50. throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream)
  51. }
  52. const readable =
  53. (_options$readable = options.readable) !== null && _options$readable !== undefined
  54. ? _options$readable
  55. : isReadableNodeStream(stream)
  56. const writable =
  57. (_options$writable = options.writable) !== null && _options$writable !== undefined
  58. ? _options$writable
  59. : isWritableNodeStream(stream)
  60. const wState = stream._writableState
  61. const rState = stream._readableState
  62. const onlegacyfinish = () => {
  63. if (!stream.writable) {
  64. onfinish()
  65. }
  66. }
  67. // TODO (ronag): Improve soft detection to include core modules and
  68. // common ecosystem modules that do properly emit 'close' but fail
  69. // this generic check.
  70. let willEmitClose =
  71. _willEmitClose(stream) && isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable
  72. let writableFinished = isWritableFinished(stream, false)
  73. const onfinish = () => {
  74. writableFinished = true
  75. // Stream should not be destroyed here. If it is that
  76. // means that user space is doing something differently and
  77. // we cannot trust willEmitClose.
  78. if (stream.destroyed) {
  79. willEmitClose = false
  80. }
  81. if (willEmitClose && (!stream.readable || readable)) {
  82. return
  83. }
  84. if (!readable || readableFinished) {
  85. callback.call(stream)
  86. }
  87. }
  88. let readableFinished = isReadableFinished(stream, false)
  89. const onend = () => {
  90. readableFinished = true
  91. // Stream should not be destroyed here. If it is that
  92. // means that user space is doing something differently and
  93. // we cannot trust willEmitClose.
  94. if (stream.destroyed) {
  95. willEmitClose = false
  96. }
  97. if (willEmitClose && (!stream.writable || writable)) {
  98. return
  99. }
  100. if (!writable || writableFinished) {
  101. callback.call(stream)
  102. }
  103. }
  104. const onerror = (err) => {
  105. callback.call(stream, err)
  106. }
  107. let closed = isClosed(stream)
  108. const onclose = () => {
  109. closed = true
  110. const errored = isWritableErrored(stream) || isReadableErrored(stream)
  111. if (errored && typeof errored !== 'boolean') {
  112. return callback.call(stream, errored)
  113. }
  114. if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
  115. if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
  116. }
  117. if (writable && !writableFinished) {
  118. if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE())
  119. }
  120. callback.call(stream)
  121. }
  122. const onclosed = () => {
  123. closed = true
  124. const errored = isWritableErrored(stream) || isReadableErrored(stream)
  125. if (errored && typeof errored !== 'boolean') {
  126. return callback.call(stream, errored)
  127. }
  128. callback.call(stream)
  129. }
  130. const onrequest = () => {
  131. stream.req.on('finish', onfinish)
  132. }
  133. if (isRequest(stream)) {
  134. stream.on('complete', onfinish)
  135. if (!willEmitClose) {
  136. stream.on('abort', onclose)
  137. }
  138. if (stream.req) {
  139. onrequest()
  140. } else {
  141. stream.on('request', onrequest)
  142. }
  143. } else if (writable && !wState) {
  144. // legacy streams
  145. stream.on('end', onlegacyfinish)
  146. stream.on('close', onlegacyfinish)
  147. }
  148. // Not all streams will emit 'close' after 'aborted'.
  149. if (!willEmitClose && typeof stream.aborted === 'boolean') {
  150. stream.on('aborted', onclose)
  151. }
  152. stream.on('end', onend)
  153. stream.on('finish', onfinish)
  154. if (options.error !== false) {
  155. stream.on('error', onerror)
  156. }
  157. stream.on('close', onclose)
  158. if (closed) {
  159. process.nextTick(onclose)
  160. } else if (
  161. (wState !== null && wState !== undefined && wState.errorEmitted) ||
  162. (rState !== null && rState !== undefined && rState.errorEmitted)
  163. ) {
  164. if (!willEmitClose) {
  165. process.nextTick(onclosed)
  166. }
  167. } else if (
  168. !readable &&
  169. (!willEmitClose || isReadable(stream)) &&
  170. (writableFinished || isWritable(stream) === false)
  171. ) {
  172. process.nextTick(onclosed)
  173. } else if (
  174. !writable &&
  175. (!willEmitClose || isWritable(stream)) &&
  176. (readableFinished || isReadable(stream) === false)
  177. ) {
  178. process.nextTick(onclosed)
  179. } else if (rState && stream.req && stream.aborted) {
  180. process.nextTick(onclosed)
  181. }
  182. const cleanup = () => {
  183. callback = nop
  184. stream.removeListener('aborted', onclose)
  185. stream.removeListener('complete', onfinish)
  186. stream.removeListener('abort', onclose)
  187. stream.removeListener('request', onrequest)
  188. if (stream.req) stream.req.removeListener('finish', onfinish)
  189. stream.removeListener('end', onlegacyfinish)
  190. stream.removeListener('close', onlegacyfinish)
  191. stream.removeListener('finish', onfinish)
  192. stream.removeListener('end', onend)
  193. stream.removeListener('error', onerror)
  194. stream.removeListener('close', onclose)
  195. }
  196. if (options.signal && !closed) {
  197. const abort = () => {
  198. // Keep it because cleanup removes it.
  199. const endCallback = callback
  200. cleanup()
  201. endCallback.call(
  202. stream,
  203. new AbortError(undefined, {
  204. cause: options.signal.reason
  205. })
  206. )
  207. }
  208. if (options.signal.aborted) {
  209. process.nextTick(abort)
  210. } else {
  211. addAbortListener = addAbortListener || require('../../ours/util').addAbortListener
  212. const disposable = addAbortListener(options.signal, abort)
  213. const originalCallback = callback
  214. callback = once((...args) => {
  215. disposable[SymbolDispose]()
  216. originalCallback.apply(stream, args)
  217. })
  218. }
  219. }
  220. return cleanup
  221. }
  222. function eosWeb(stream, options, callback) {
  223. let isAborted = false
  224. let abort = nop
  225. if (options.signal) {
  226. abort = () => {
  227. isAborted = true
  228. callback.call(
  229. stream,
  230. new AbortError(undefined, {
  231. cause: options.signal.reason
  232. })
  233. )
  234. }
  235. if (options.signal.aborted) {
  236. process.nextTick(abort)
  237. } else {
  238. addAbortListener = addAbortListener || require('../../ours/util').addAbortListener
  239. const disposable = addAbortListener(options.signal, abort)
  240. const originalCallback = callback
  241. callback = once((...args) => {
  242. disposable[SymbolDispose]()
  243. originalCallback.apply(stream, args)
  244. })
  245. }
  246. }
  247. const resolverFn = (...args) => {
  248. if (!isAborted) {
  249. process.nextTick(() => callback.apply(stream, args))
  250. }
  251. }
  252. PromisePrototypeThen(stream[kIsClosedPromise].promise, resolverFn, resolverFn)
  253. return nop
  254. }
  255. function finished(stream, opts) {
  256. var _opts
  257. let autoCleanup = false
  258. if (opts === null) {
  259. opts = kEmptyObject
  260. }
  261. if ((_opts = opts) !== null && _opts !== undefined && _opts.cleanup) {
  262. validateBoolean(opts.cleanup, 'cleanup')
  263. autoCleanup = opts.cleanup
  264. }
  265. return new Promise((resolve, reject) => {
  266. const cleanup = eos(stream, opts, (err) => {
  267. if (autoCleanup) {
  268. cleanup()
  269. }
  270. if (err) {
  271. reject(err)
  272. } else {
  273. resolve()
  274. }
  275. })
  276. })
  277. }
  278. module.exports = eos
  279. module.exports.finished = finished