destroy.js 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. 'use strict'
  2. /* replacement start */
  3. const process = require('process/')
  4. /* replacement end */
  5. const {
  6. aggregateTwoErrors,
  7. codes: { ERR_MULTIPLE_CALLBACK },
  8. AbortError
  9. } = require('../../ours/errors')
  10. const { Symbol } = require('../../ours/primordials')
  11. const { kIsDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils')
  12. const kDestroy = Symbol('kDestroy')
  13. const kConstruct = Symbol('kConstruct')
  14. function checkError(err, w, r) {
  15. if (err) {
  16. // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
  17. err.stack // eslint-disable-line no-unused-expressions
  18. if (w && !w.errored) {
  19. w.errored = err
  20. }
  21. if (r && !r.errored) {
  22. r.errored = err
  23. }
  24. }
  25. }
  26. // Backwards compat. cb() is undocumented and unused in core but
  27. // unfortunately might be used by modules.
  28. function destroy(err, cb) {
  29. const r = this._readableState
  30. const w = this._writableState
  31. // With duplex streams we use the writable side for state.
  32. const s = w || r
  33. if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) {
  34. if (typeof cb === 'function') {
  35. cb()
  36. }
  37. return this
  38. }
  39. // We set destroyed to true before firing error callbacks in order
  40. // to make it re-entrance safe in case destroy() is called within callbacks
  41. checkError(err, w, r)
  42. if (w) {
  43. w.destroyed = true
  44. }
  45. if (r) {
  46. r.destroyed = true
  47. }
  48. // If still constructing then defer calling _destroy.
  49. if (!s.constructed) {
  50. this.once(kDestroy, function (er) {
  51. _destroy(this, aggregateTwoErrors(er, err), cb)
  52. })
  53. } else {
  54. _destroy(this, err, cb)
  55. }
  56. return this
  57. }
  58. function _destroy(self, err, cb) {
  59. let called = false
  60. function onDestroy(err) {
  61. if (called) {
  62. return
  63. }
  64. called = true
  65. const r = self._readableState
  66. const w = self._writableState
  67. checkError(err, w, r)
  68. if (w) {
  69. w.closed = true
  70. }
  71. if (r) {
  72. r.closed = true
  73. }
  74. if (typeof cb === 'function') {
  75. cb(err)
  76. }
  77. if (err) {
  78. process.nextTick(emitErrorCloseNT, self, err)
  79. } else {
  80. process.nextTick(emitCloseNT, self)
  81. }
  82. }
  83. try {
  84. self._destroy(err || null, onDestroy)
  85. } catch (err) {
  86. onDestroy(err)
  87. }
  88. }
  89. function emitErrorCloseNT(self, err) {
  90. emitErrorNT(self, err)
  91. emitCloseNT(self)
  92. }
  93. function emitCloseNT(self) {
  94. const r = self._readableState
  95. const w = self._writableState
  96. if (w) {
  97. w.closeEmitted = true
  98. }
  99. if (r) {
  100. r.closeEmitted = true
  101. }
  102. if ((w !== null && w !== undefined && w.emitClose) || (r !== null && r !== undefined && r.emitClose)) {
  103. self.emit('close')
  104. }
  105. }
  106. function emitErrorNT(self, err) {
  107. const r = self._readableState
  108. const w = self._writableState
  109. if ((w !== null && w !== undefined && w.errorEmitted) || (r !== null && r !== undefined && r.errorEmitted)) {
  110. return
  111. }
  112. if (w) {
  113. w.errorEmitted = true
  114. }
  115. if (r) {
  116. r.errorEmitted = true
  117. }
  118. self.emit('error', err)
  119. }
  120. function undestroy() {
  121. const r = this._readableState
  122. const w = this._writableState
  123. if (r) {
  124. r.constructed = true
  125. r.closed = false
  126. r.closeEmitted = false
  127. r.destroyed = false
  128. r.errored = null
  129. r.errorEmitted = false
  130. r.reading = false
  131. r.ended = r.readable === false
  132. r.endEmitted = r.readable === false
  133. }
  134. if (w) {
  135. w.constructed = true
  136. w.destroyed = false
  137. w.closed = false
  138. w.closeEmitted = false
  139. w.errored = null
  140. w.errorEmitted = false
  141. w.finalCalled = false
  142. w.prefinished = false
  143. w.ended = w.writable === false
  144. w.ending = w.writable === false
  145. w.finished = w.writable === false
  146. }
  147. }
  148. function errorOrDestroy(stream, err, sync) {
  149. // We have tests that rely on errors being emitted
  150. // in the same tick, so changing this is semver major.
  151. // For now when you opt-in to autoDestroy we allow
  152. // the error to be emitted nextTick. In a future
  153. // semver major update we should change the default to this.
  154. const r = stream._readableState
  155. const w = stream._writableState
  156. if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) {
  157. return this
  158. }
  159. if ((r !== null && r !== undefined && r.autoDestroy) || (w !== null && w !== undefined && w.autoDestroy))
  160. stream.destroy(err)
  161. else if (err) {
  162. // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
  163. err.stack // eslint-disable-line no-unused-expressions
  164. if (w && !w.errored) {
  165. w.errored = err
  166. }
  167. if (r && !r.errored) {
  168. r.errored = err
  169. }
  170. if (sync) {
  171. process.nextTick(emitErrorNT, stream, err)
  172. } else {
  173. emitErrorNT(stream, err)
  174. }
  175. }
  176. }
  177. function construct(stream, cb) {
  178. if (typeof stream._construct !== 'function') {
  179. return
  180. }
  181. const r = stream._readableState
  182. const w = stream._writableState
  183. if (r) {
  184. r.constructed = false
  185. }
  186. if (w) {
  187. w.constructed = false
  188. }
  189. stream.once(kConstruct, cb)
  190. if (stream.listenerCount(kConstruct) > 1) {
  191. // Duplex
  192. return
  193. }
  194. process.nextTick(constructNT, stream)
  195. }
  196. function constructNT(stream) {
  197. let called = false
  198. function onConstruct(err) {
  199. if (called) {
  200. errorOrDestroy(stream, err !== null && err !== undefined ? err : new ERR_MULTIPLE_CALLBACK())
  201. return
  202. }
  203. called = true
  204. const r = stream._readableState
  205. const w = stream._writableState
  206. const s = w || r
  207. if (r) {
  208. r.constructed = true
  209. }
  210. if (w) {
  211. w.constructed = true
  212. }
  213. if (s.destroyed) {
  214. stream.emit(kDestroy, err)
  215. } else if (err) {
  216. errorOrDestroy(stream, err, true)
  217. } else {
  218. process.nextTick(emitConstructNT, stream)
  219. }
  220. }
  221. try {
  222. stream._construct((err) => {
  223. process.nextTick(onConstruct, err)
  224. })
  225. } catch (err) {
  226. process.nextTick(onConstruct, err)
  227. }
  228. }
  229. function emitConstructNT(stream) {
  230. stream.emit(kConstruct)
  231. }
  232. function isRequest(stream) {
  233. return (stream === null || stream === undefined ? undefined : stream.setHeader) && typeof stream.abort === 'function'
  234. }
  235. function emitCloseLegacy(stream) {
  236. stream.emit('close')
  237. }
  238. function emitErrorCloseLegacy(stream, err) {
  239. stream.emit('error', err)
  240. process.nextTick(emitCloseLegacy, stream)
  241. }
  242. // Normalize destroy for legacy.
  243. function destroyer(stream, err) {
  244. if (!stream || isDestroyed(stream)) {
  245. return
  246. }
  247. if (!err && !isFinished(stream)) {
  248. err = new AbortError()
  249. }
  250. // TODO: Remove isRequest branches.
  251. if (isServerRequest(stream)) {
  252. stream.socket = null
  253. stream.destroy(err)
  254. } else if (isRequest(stream)) {
  255. stream.abort()
  256. } else if (isRequest(stream.req)) {
  257. stream.req.abort()
  258. } else if (typeof stream.destroy === 'function') {
  259. stream.destroy(err)
  260. } else if (typeof stream.close === 'function') {
  261. // TODO: Don't lose err?
  262. stream.close()
  263. } else if (err) {
  264. process.nextTick(emitErrorCloseLegacy, stream, err)
  265. } else {
  266. process.nextTick(emitCloseLegacy, stream)
  267. }
  268. if (!stream.destroyed) {
  269. stream[kIsDestroyed] = true
  270. }
  271. }
  272. module.exports = {
  273. construct,
  274. destroyer,
  275. destroy,
  276. undestroy,
  277. errorOrDestroy
  278. }