writable.js 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. // Copyright Joyent, Inc. and other Node contributors.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a
  4. // copy of this software and associated documentation files (the
  5. // "Software"), to deal in the Software without restriction, including
  6. // without limitation the rights to use, copy, modify, merge, publish,
  7. // distribute, sublicense, and/or sell copies of the Software, and to permit
  8. // persons to whom the Software is furnished to do so, subject to the
  9. // following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included
  12. // in all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  15. // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  16. // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
  17. // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
  18. // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
  19. // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
  20. // USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. // A bit simpler than readable streams.
  22. // Implement an async ._write(chunk, encoding, cb), and it'll handle all
  23. // the drain event emission and buffering.
  24. 'use strict'
  25. /* replacement start */
  26. const process = require('process/')
  27. /* replacement end */
  28. const {
  29. ArrayPrototypeSlice,
  30. Error,
  31. FunctionPrototypeSymbolHasInstance,
  32. ObjectDefineProperty,
  33. ObjectDefineProperties,
  34. ObjectSetPrototypeOf,
  35. StringPrototypeToLowerCase,
  36. Symbol,
  37. SymbolHasInstance
  38. } = require('../../ours/primordials')
  39. module.exports = Writable
  40. Writable.WritableState = WritableState
  41. const { EventEmitter: EE } = require('events')
  42. const Stream = require('./legacy').Stream
  43. const { Buffer } = require('buffer')
  44. const destroyImpl = require('./destroy')
  45. const { addAbortSignal } = require('./add-abort-signal')
  46. const { getHighWaterMark, getDefaultHighWaterMark } = require('./state')
  47. const {
  48. ERR_INVALID_ARG_TYPE,
  49. ERR_METHOD_NOT_IMPLEMENTED,
  50. ERR_MULTIPLE_CALLBACK,
  51. ERR_STREAM_CANNOT_PIPE,
  52. ERR_STREAM_DESTROYED,
  53. ERR_STREAM_ALREADY_FINISHED,
  54. ERR_STREAM_NULL_VALUES,
  55. ERR_STREAM_WRITE_AFTER_END,
  56. ERR_UNKNOWN_ENCODING
  57. } = require('../../ours/errors').codes
  58. const { errorOrDestroy } = destroyImpl
  59. ObjectSetPrototypeOf(Writable.prototype, Stream.prototype)
  60. ObjectSetPrototypeOf(Writable, Stream)
  61. function nop() {}
  62. const kOnFinished = Symbol('kOnFinished')
  63. function WritableState(options, stream, isDuplex) {
  64. // Duplex streams are both readable and writable, but share
  65. // the same options object.
  66. // However, some cases require setting options to different
  67. // values for the readable and the writable sides of the duplex stream,
  68. // e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
  69. if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex')
  70. // Object stream flag to indicate whether or not this stream
  71. // contains buffers or objects.
  72. this.objectMode = !!(options && options.objectMode)
  73. if (isDuplex) this.objectMode = this.objectMode || !!(options && options.writableObjectMode)
  74. // The point at which write() starts returning false
  75. // Note: 0 is a valid value, means that we always return false if
  76. // the entire buffer is not flushed immediately on write().
  77. this.highWaterMark = options
  78. ? getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex)
  79. : getDefaultHighWaterMark(false)
  80. // if _final has been called.
  81. this.finalCalled = false
  82. // drain event flag.
  83. this.needDrain = false
  84. // At the start of calling end()
  85. this.ending = false
  86. // When end() has been called, and returned.
  87. this.ended = false
  88. // When 'finish' is emitted.
  89. this.finished = false
  90. // Has it been destroyed
  91. this.destroyed = false
  92. // Should we decode strings into buffers before passing to _write?
  93. // this is here so that some node-core streams can optimize string
  94. // handling at a lower level.
  95. const noDecode = !!(options && options.decodeStrings === false)
  96. this.decodeStrings = !noDecode
  97. // Crypto is kind of old and crusty. Historically, its default string
  98. // encoding is 'binary' so we have to make this configurable.
  99. // Everything else in the universe uses 'utf8', though.
  100. this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'
  101. // Not an actual buffer we keep track of, but a measurement
  102. // of how much we're waiting to get pushed to some underlying
  103. // socket or file.
  104. this.length = 0
  105. // A flag to see when we're in the middle of a write.
  106. this.writing = false
  107. // When true all writes will be buffered until .uncork() call.
  108. this.corked = 0
  109. // A flag to be able to tell if the onwrite cb is called immediately,
  110. // or on a later tick. We set this to true at first, because any
  111. // actions that shouldn't happen until "later" should generally also
  112. // not happen before the first write call.
  113. this.sync = true
  114. // A flag to know if we're processing previously buffered items, which
  115. // may call the _write() callback in the same tick, so that we don't
  116. // end up in an overlapped onwrite situation.
  117. this.bufferProcessing = false
  118. // The callback that's passed to _write(chunk, cb).
  119. this.onwrite = onwrite.bind(undefined, stream)
  120. // The callback that the user supplies to write(chunk, encoding, cb).
  121. this.writecb = null
  122. // The amount that is being written when _write is called.
  123. this.writelen = 0
  124. // Storage for data passed to the afterWrite() callback in case of
  125. // synchronous _write() completion.
  126. this.afterWriteTickInfo = null
  127. resetBuffer(this)
  128. // Number of pending user-supplied write callbacks
  129. // this must be 0 before 'finish' can be emitted.
  130. this.pendingcb = 0
  131. // Stream is still being constructed and cannot be
  132. // destroyed until construction finished or failed.
  133. // Async construction is opt in, therefore we start as
  134. // constructed.
  135. this.constructed = true
  136. // Emit prefinish if the only thing we're waiting for is _write cbs
  137. // This is relevant for synchronous Transform streams.
  138. this.prefinished = false
  139. // True if the error was already emitted and should not be thrown again.
  140. this.errorEmitted = false
  141. // Should close be emitted on destroy. Defaults to true.
  142. this.emitClose = !options || options.emitClose !== false
  143. // Should .destroy() be called after 'finish' (and potentially 'end').
  144. this.autoDestroy = !options || options.autoDestroy !== false
  145. // Indicates whether the stream has errored. When true all write() calls
  146. // should return false. This is needed since when autoDestroy
  147. // is disabled we need a way to tell whether the stream has failed.
  148. this.errored = null
  149. // Indicates whether the stream has finished destroying.
  150. this.closed = false
  151. // True if close has been emitted or would have been emitted
  152. // depending on emitClose.
  153. this.closeEmitted = false
  154. this[kOnFinished] = []
  155. }
  156. function resetBuffer(state) {
  157. state.buffered = []
  158. state.bufferedIndex = 0
  159. state.allBuffers = true
  160. state.allNoop = true
  161. }
  162. WritableState.prototype.getBuffer = function getBuffer() {
  163. return ArrayPrototypeSlice(this.buffered, this.bufferedIndex)
  164. }
  165. ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
  166. __proto__: null,
  167. get() {
  168. return this.buffered.length - this.bufferedIndex
  169. }
  170. })
  171. function Writable(options) {
  172. // Writable ctor is applied to Duplexes, too.
  173. // `realHasInstance` is necessary because using plain `instanceof`
  174. // would return false, as no `_writableState` property is attached.
  175. // Trying to use the custom `instanceof` for Writable here will also break the
  176. // Node.js LazyTransform implementation, which has a non-trivial getter for
  177. // `_writableState` that would lead to infinite recursion.
  178. // Checking for a Stream.Duplex instance is faster here instead of inside
  179. // the WritableState constructor, at least with V8 6.5.
  180. const isDuplex = this instanceof require('./duplex')
  181. if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) return new Writable(options)
  182. this._writableState = new WritableState(options, this, isDuplex)
  183. if (options) {
  184. if (typeof options.write === 'function') this._write = options.write
  185. if (typeof options.writev === 'function') this._writev = options.writev
  186. if (typeof options.destroy === 'function') this._destroy = options.destroy
  187. if (typeof options.final === 'function') this._final = options.final
  188. if (typeof options.construct === 'function') this._construct = options.construct
  189. if (options.signal) addAbortSignal(options.signal, this)
  190. }
  191. Stream.call(this, options)
  192. destroyImpl.construct(this, () => {
  193. const state = this._writableState
  194. if (!state.writing) {
  195. clearBuffer(this, state)
  196. }
  197. finishMaybe(this, state)
  198. })
  199. }
  200. ObjectDefineProperty(Writable, SymbolHasInstance, {
  201. __proto__: null,
  202. value: function (object) {
  203. if (FunctionPrototypeSymbolHasInstance(this, object)) return true
  204. if (this !== Writable) return false
  205. return object && object._writableState instanceof WritableState
  206. }
  207. })
  208. // Otherwise people can pipe Writable streams, which is just wrong.
  209. Writable.prototype.pipe = function () {
  210. errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE())
  211. }
  212. function _write(stream, chunk, encoding, cb) {
  213. const state = stream._writableState
  214. if (typeof encoding === 'function') {
  215. cb = encoding
  216. encoding = state.defaultEncoding
  217. } else {
  218. if (!encoding) encoding = state.defaultEncoding
  219. else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) throw new ERR_UNKNOWN_ENCODING(encoding)
  220. if (typeof cb !== 'function') cb = nop
  221. }
  222. if (chunk === null) {
  223. throw new ERR_STREAM_NULL_VALUES()
  224. } else if (!state.objectMode) {
  225. if (typeof chunk === 'string') {
  226. if (state.decodeStrings !== false) {
  227. chunk = Buffer.from(chunk, encoding)
  228. encoding = 'buffer'
  229. }
  230. } else if (chunk instanceof Buffer) {
  231. encoding = 'buffer'
  232. } else if (Stream._isUint8Array(chunk)) {
  233. chunk = Stream._uint8ArrayToBuffer(chunk)
  234. encoding = 'buffer'
  235. } else {
  236. throw new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk)
  237. }
  238. }
  239. let err
  240. if (state.ending) {
  241. err = new ERR_STREAM_WRITE_AFTER_END()
  242. } else if (state.destroyed) {
  243. err = new ERR_STREAM_DESTROYED('write')
  244. }
  245. if (err) {
  246. process.nextTick(cb, err)
  247. errorOrDestroy(stream, err, true)
  248. return err
  249. }
  250. state.pendingcb++
  251. return writeOrBuffer(stream, state, chunk, encoding, cb)
  252. }
  253. Writable.prototype.write = function (chunk, encoding, cb) {
  254. return _write(this, chunk, encoding, cb) === true
  255. }
  256. Writable.prototype.cork = function () {
  257. this._writableState.corked++
  258. }
  259. Writable.prototype.uncork = function () {
  260. const state = this._writableState
  261. if (state.corked) {
  262. state.corked--
  263. if (!state.writing) clearBuffer(this, state)
  264. }
  265. }
  266. Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
  267. // node::ParseEncoding() requires lower case.
  268. if (typeof encoding === 'string') encoding = StringPrototypeToLowerCase(encoding)
  269. if (!Buffer.isEncoding(encoding)) throw new ERR_UNKNOWN_ENCODING(encoding)
  270. this._writableState.defaultEncoding = encoding
  271. return this
  272. }
  273. // If we're already writing something, then just put this
  274. // in the queue, and wait our turn. Otherwise, call _write
  275. // If we return false, then we need a drain event, so set that flag.
  276. function writeOrBuffer(stream, state, chunk, encoding, callback) {
  277. const len = state.objectMode ? 1 : chunk.length
  278. state.length += len
  279. // stream._write resets state.length
  280. const ret = state.length < state.highWaterMark
  281. // We must ensure that previous needDrain will not be reset to false.
  282. if (!ret) state.needDrain = true
  283. if (state.writing || state.corked || state.errored || !state.constructed) {
  284. state.buffered.push({
  285. chunk,
  286. encoding,
  287. callback
  288. })
  289. if (state.allBuffers && encoding !== 'buffer') {
  290. state.allBuffers = false
  291. }
  292. if (state.allNoop && callback !== nop) {
  293. state.allNoop = false
  294. }
  295. } else {
  296. state.writelen = len
  297. state.writecb = callback
  298. state.writing = true
  299. state.sync = true
  300. stream._write(chunk, encoding, state.onwrite)
  301. state.sync = false
  302. }
  303. // Return false if errored or destroyed in order to break
  304. // any synchronous while(stream.write(data)) loops.
  305. return ret && !state.errored && !state.destroyed
  306. }
  307. function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  308. state.writelen = len
  309. state.writecb = cb
  310. state.writing = true
  311. state.sync = true
  312. if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write'))
  313. else if (writev) stream._writev(chunk, state.onwrite)
  314. else stream._write(chunk, encoding, state.onwrite)
  315. state.sync = false
  316. }
  317. function onwriteError(stream, state, er, cb) {
  318. --state.pendingcb
  319. cb(er)
  320. // Ensure callbacks are invoked even when autoDestroy is
  321. // not enabled. Passing `er` here doesn't make sense since
  322. // it's related to one specific write, not to the buffered
  323. // writes.
  324. errorBuffer(state)
  325. // This can emit error, but error must always follow cb.
  326. errorOrDestroy(stream, er)
  327. }
  328. function onwrite(stream, er) {
  329. const state = stream._writableState
  330. const sync = state.sync
  331. const cb = state.writecb
  332. if (typeof cb !== 'function') {
  333. errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK())
  334. return
  335. }
  336. state.writing = false
  337. state.writecb = null
  338. state.length -= state.writelen
  339. state.writelen = 0
  340. if (er) {
  341. // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
  342. er.stack // eslint-disable-line no-unused-expressions
  343. if (!state.errored) {
  344. state.errored = er
  345. }
  346. // In case of duplex streams we need to notify the readable side of the
  347. // error.
  348. if (stream._readableState && !stream._readableState.errored) {
  349. stream._readableState.errored = er
  350. }
  351. if (sync) {
  352. process.nextTick(onwriteError, stream, state, er, cb)
  353. } else {
  354. onwriteError(stream, state, er, cb)
  355. }
  356. } else {
  357. if (state.buffered.length > state.bufferedIndex) {
  358. clearBuffer(stream, state)
  359. }
  360. if (sync) {
  361. // It is a common case that the callback passed to .write() is always
  362. // the same. In that case, we do not schedule a new nextTick(), but
  363. // rather just increase a counter, to improve performance and avoid
  364. // memory allocations.
  365. if (state.afterWriteTickInfo !== null && state.afterWriteTickInfo.cb === cb) {
  366. state.afterWriteTickInfo.count++
  367. } else {
  368. state.afterWriteTickInfo = {
  369. count: 1,
  370. cb,
  371. stream,
  372. state
  373. }
  374. process.nextTick(afterWriteTick, state.afterWriteTickInfo)
  375. }
  376. } else {
  377. afterWrite(stream, state, 1, cb)
  378. }
  379. }
  380. }
  381. function afterWriteTick({ stream, state, count, cb }) {
  382. state.afterWriteTickInfo = null
  383. return afterWrite(stream, state, count, cb)
  384. }
  385. function afterWrite(stream, state, count, cb) {
  386. const needDrain = !state.ending && !stream.destroyed && state.length === 0 && state.needDrain
  387. if (needDrain) {
  388. state.needDrain = false
  389. stream.emit('drain')
  390. }
  391. while (count-- > 0) {
  392. state.pendingcb--
  393. cb()
  394. }
  395. if (state.destroyed) {
  396. errorBuffer(state)
  397. }
  398. finishMaybe(stream, state)
  399. }
  400. // If there's something in the buffer waiting, then invoke callbacks.
  401. function errorBuffer(state) {
  402. if (state.writing) {
  403. return
  404. }
  405. for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
  406. var _state$errored
  407. const { chunk, callback } = state.buffered[n]
  408. const len = state.objectMode ? 1 : chunk.length
  409. state.length -= len
  410. callback(
  411. (_state$errored = state.errored) !== null && _state$errored !== undefined
  412. ? _state$errored
  413. : new ERR_STREAM_DESTROYED('write')
  414. )
  415. }
  416. const onfinishCallbacks = state[kOnFinished].splice(0)
  417. for (let i = 0; i < onfinishCallbacks.length; i++) {
  418. var _state$errored2
  419. onfinishCallbacks[i](
  420. (_state$errored2 = state.errored) !== null && _state$errored2 !== undefined
  421. ? _state$errored2
  422. : new ERR_STREAM_DESTROYED('end')
  423. )
  424. }
  425. resetBuffer(state)
  426. }
  427. // If there's something in the buffer waiting, then process it.
  428. function clearBuffer(stream, state) {
  429. if (state.corked || state.bufferProcessing || state.destroyed || !state.constructed) {
  430. return
  431. }
  432. const { buffered, bufferedIndex, objectMode } = state
  433. const bufferedLength = buffered.length - bufferedIndex
  434. if (!bufferedLength) {
  435. return
  436. }
  437. let i = bufferedIndex
  438. state.bufferProcessing = true
  439. if (bufferedLength > 1 && stream._writev) {
  440. state.pendingcb -= bufferedLength - 1
  441. const callback = state.allNoop
  442. ? nop
  443. : (err) => {
  444. for (let n = i; n < buffered.length; ++n) {
  445. buffered[n].callback(err)
  446. }
  447. }
  448. // Make a copy of `buffered` if it's going to be used by `callback` above,
  449. // since `doWrite` will mutate the array.
  450. const chunks = state.allNoop && i === 0 ? buffered : ArrayPrototypeSlice(buffered, i)
  451. chunks.allBuffers = state.allBuffers
  452. doWrite(stream, state, true, state.length, chunks, '', callback)
  453. resetBuffer(state)
  454. } else {
  455. do {
  456. const { chunk, encoding, callback } = buffered[i]
  457. buffered[i++] = null
  458. const len = objectMode ? 1 : chunk.length
  459. doWrite(stream, state, false, len, chunk, encoding, callback)
  460. } while (i < buffered.length && !state.writing)
  461. if (i === buffered.length) {
  462. resetBuffer(state)
  463. } else if (i > 256) {
  464. buffered.splice(0, i)
  465. state.bufferedIndex = 0
  466. } else {
  467. state.bufferedIndex = i
  468. }
  469. }
  470. state.bufferProcessing = false
  471. }
  472. Writable.prototype._write = function (chunk, encoding, cb) {
  473. if (this._writev) {
  474. this._writev(
  475. [
  476. {
  477. chunk,
  478. encoding
  479. }
  480. ],
  481. cb
  482. )
  483. } else {
  484. throw new ERR_METHOD_NOT_IMPLEMENTED('_write()')
  485. }
  486. }
  487. Writable.prototype._writev = null
  488. Writable.prototype.end = function (chunk, encoding, cb) {
  489. const state = this._writableState
  490. if (typeof chunk === 'function') {
  491. cb = chunk
  492. chunk = null
  493. encoding = null
  494. } else if (typeof encoding === 'function') {
  495. cb = encoding
  496. encoding = null
  497. }
  498. let err
  499. if (chunk !== null && chunk !== undefined) {
  500. const ret = _write(this, chunk, encoding)
  501. if (ret instanceof Error) {
  502. err = ret
  503. }
  504. }
  505. // .end() fully uncorks.
  506. if (state.corked) {
  507. state.corked = 1
  508. this.uncork()
  509. }
  510. if (err) {
  511. // Do nothing...
  512. } else if (!state.errored && !state.ending) {
  513. // This is forgiving in terms of unnecessary calls to end() and can hide
  514. // logic errors. However, usually such errors are harmless and causing a
  515. // hard error can be disproportionately destructive. It is not always
  516. // trivial for the user to determine whether end() needs to be called
  517. // or not.
  518. state.ending = true
  519. finishMaybe(this, state, true)
  520. state.ended = true
  521. } else if (state.finished) {
  522. err = new ERR_STREAM_ALREADY_FINISHED('end')
  523. } else if (state.destroyed) {
  524. err = new ERR_STREAM_DESTROYED('end')
  525. }
  526. if (typeof cb === 'function') {
  527. if (err || state.finished) {
  528. process.nextTick(cb, err)
  529. } else {
  530. state[kOnFinished].push(cb)
  531. }
  532. }
  533. return this
  534. }
  535. function needFinish(state) {
  536. return (
  537. state.ending &&
  538. !state.destroyed &&
  539. state.constructed &&
  540. state.length === 0 &&
  541. !state.errored &&
  542. state.buffered.length === 0 &&
  543. !state.finished &&
  544. !state.writing &&
  545. !state.errorEmitted &&
  546. !state.closeEmitted
  547. )
  548. }
  549. function callFinal(stream, state) {
  550. let called = false
  551. function onFinish(err) {
  552. if (called) {
  553. errorOrDestroy(stream, err !== null && err !== undefined ? err : ERR_MULTIPLE_CALLBACK())
  554. return
  555. }
  556. called = true
  557. state.pendingcb--
  558. if (err) {
  559. const onfinishCallbacks = state[kOnFinished].splice(0)
  560. for (let i = 0; i < onfinishCallbacks.length; i++) {
  561. onfinishCallbacks[i](err)
  562. }
  563. errorOrDestroy(stream, err, state.sync)
  564. } else if (needFinish(state)) {
  565. state.prefinished = true
  566. stream.emit('prefinish')
  567. // Backwards compat. Don't check state.sync here.
  568. // Some streams assume 'finish' will be emitted
  569. // asynchronously relative to _final callback.
  570. state.pendingcb++
  571. process.nextTick(finish, stream, state)
  572. }
  573. }
  574. state.sync = true
  575. state.pendingcb++
  576. try {
  577. stream._final(onFinish)
  578. } catch (err) {
  579. onFinish(err)
  580. }
  581. state.sync = false
  582. }
  583. function prefinish(stream, state) {
  584. if (!state.prefinished && !state.finalCalled) {
  585. if (typeof stream._final === 'function' && !state.destroyed) {
  586. state.finalCalled = true
  587. callFinal(stream, state)
  588. } else {
  589. state.prefinished = true
  590. stream.emit('prefinish')
  591. }
  592. }
  593. }
  594. function finishMaybe(stream, state, sync) {
  595. if (needFinish(state)) {
  596. prefinish(stream, state)
  597. if (state.pendingcb === 0) {
  598. if (sync) {
  599. state.pendingcb++
  600. process.nextTick(
  601. (stream, state) => {
  602. if (needFinish(state)) {
  603. finish(stream, state)
  604. } else {
  605. state.pendingcb--
  606. }
  607. },
  608. stream,
  609. state
  610. )
  611. } else if (needFinish(state)) {
  612. state.pendingcb++
  613. finish(stream, state)
  614. }
  615. }
  616. }
  617. }
  618. function finish(stream, state) {
  619. state.pendingcb--
  620. state.finished = true
  621. const onfinishCallbacks = state[kOnFinished].splice(0)
  622. for (let i = 0; i < onfinishCallbacks.length; i++) {
  623. onfinishCallbacks[i]()
  624. }
  625. stream.emit('finish')
  626. if (state.autoDestroy) {
  627. // In case of duplex streams we need a way to detect
  628. // if the readable side is ready for autoDestroy as well.
  629. const rState = stream._readableState
  630. const autoDestroy =
  631. !rState ||
  632. (rState.autoDestroy &&
  633. // We don't expect the readable to ever 'end'
  634. // if readable is explicitly set to false.
  635. (rState.endEmitted || rState.readable === false))
  636. if (autoDestroy) {
  637. stream.destroy()
  638. }
  639. }
  640. }
  641. ObjectDefineProperties(Writable.prototype, {
  642. closed: {
  643. __proto__: null,
  644. get() {
  645. return this._writableState ? this._writableState.closed : false
  646. }
  647. },
  648. destroyed: {
  649. __proto__: null,
  650. get() {
  651. return this._writableState ? this._writableState.destroyed : false
  652. },
  653. set(value) {
  654. // Backward compatibility, the user is explicitly managing destroyed.
  655. if (this._writableState) {
  656. this._writableState.destroyed = value
  657. }
  658. }
  659. },
  660. writable: {
  661. __proto__: null,
  662. get() {
  663. const w = this._writableState
  664. // w.writable === false means that this is part of a Duplex stream
  665. // where the writable side was disabled upon construction.
  666. // Compat. The user might manually disable writable side through
  667. // deprecated setter.
  668. return !!w && w.writable !== false && !w.destroyed && !w.errored && !w.ending && !w.ended
  669. },
  670. set(val) {
  671. // Backwards compatible.
  672. if (this._writableState) {
  673. this._writableState.writable = !!val
  674. }
  675. }
  676. },
  677. writableFinished: {
  678. __proto__: null,
  679. get() {
  680. return this._writableState ? this._writableState.finished : false
  681. }
  682. },
  683. writableObjectMode: {
  684. __proto__: null,
  685. get() {
  686. return this._writableState ? this._writableState.objectMode : false
  687. }
  688. },
  689. writableBuffer: {
  690. __proto__: null,
  691. get() {
  692. return this._writableState && this._writableState.getBuffer()
  693. }
  694. },
  695. writableEnded: {
  696. __proto__: null,
  697. get() {
  698. return this._writableState ? this._writableState.ending : false
  699. }
  700. },
  701. writableNeedDrain: {
  702. __proto__: null,
  703. get() {
  704. const wState = this._writableState
  705. if (!wState) return false
  706. return !wState.destroyed && !wState.ending && wState.needDrain
  707. }
  708. },
  709. writableHighWaterMark: {
  710. __proto__: null,
  711. get() {
  712. return this._writableState && this._writableState.highWaterMark
  713. }
  714. },
  715. writableCorked: {
  716. __proto__: null,
  717. get() {
  718. return this._writableState ? this._writableState.corked : 0
  719. }
  720. },
  721. writableLength: {
  722. __proto__: null,
  723. get() {
  724. return this._writableState && this._writableState.length
  725. }
  726. },
  727. errored: {
  728. __proto__: null,
  729. enumerable: false,
  730. get() {
  731. return this._writableState ? this._writableState.errored : null
  732. }
  733. },
  734. writableAborted: {
  735. __proto__: null,
  736. enumerable: false,
  737. get: function () {
  738. return !!(
  739. this._writableState.writable !== false &&
  740. (this._writableState.destroyed || this._writableState.errored) &&
  741. !this._writableState.finished
  742. )
  743. }
  744. }
  745. })
  746. const destroy = destroyImpl.destroy
  747. Writable.prototype.destroy = function (err, cb) {
  748. const state = this._writableState
  749. // Invoke pending callbacks.
  750. if (!state.destroyed && (state.bufferedIndex < state.buffered.length || state[kOnFinished].length)) {
  751. process.nextTick(errorBuffer, state)
  752. }
  753. destroy.call(this, err, cb)
  754. return this
  755. }
  756. Writable.prototype._undestroy = destroyImpl.undestroy
  757. Writable.prototype._destroy = function (err, cb) {
  758. cb(err)
  759. }
  760. Writable.prototype[EE.captureRejectionSymbol] = function (err) {
  761. this.destroy(err)
  762. }
  763. let webStreamsAdapters
  764. // Lazy to avoid circular references
  765. function lazyWebStreams() {
  766. if (webStreamsAdapters === undefined) webStreamsAdapters = {}
  767. return webStreamsAdapters
  768. }
  769. Writable.fromWeb = function (writableStream, options) {
  770. return lazyWebStreams().newStreamWritableFromWritableStream(writableStream, options)
  771. }
  772. Writable.toWeb = function (streamWritable) {
  773. return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable)
  774. }