| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819 |
- // Copyright Joyent, Inc. and other Node contributors.
- //
- // Permission is hereby granted, free of charge, to any person obtaining a
- // copy of this software and associated documentation files (the
- // "Software"), to deal in the Software without restriction, including
- // without limitation the rights to use, copy, modify, merge, publish,
- // distribute, sublicense, and/or sell copies of the Software, and to permit
- // persons to whom the Software is furnished to do so, subject to the
- // following conditions:
- //
- // The above copyright notice and this permission notice shall be included
- // in all copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
- // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
- // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
- // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
- // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
- // USE OR OTHER DEALINGS IN THE SOFTWARE.
- // A bit simpler than readable streams.
- // Implement an async ._write(chunk, encoding, cb), and it'll handle all
- // the drain event emission and buffering.
- 'use strict'
- /* replacement start */
- const process = require('process/')
- /* replacement end */
- const {
- ArrayPrototypeSlice,
- Error,
- FunctionPrototypeSymbolHasInstance,
- ObjectDefineProperty,
- ObjectDefineProperties,
- ObjectSetPrototypeOf,
- StringPrototypeToLowerCase,
- Symbol,
- SymbolHasInstance
- } = require('../../ours/primordials')
- module.exports = Writable
- Writable.WritableState = WritableState
- const { EventEmitter: EE } = require('events')
- const Stream = require('./legacy').Stream
- const { Buffer } = require('buffer')
- const destroyImpl = require('./destroy')
- const { addAbortSignal } = require('./add-abort-signal')
- const { getHighWaterMark, getDefaultHighWaterMark } = require('./state')
- const {
- ERR_INVALID_ARG_TYPE,
- ERR_METHOD_NOT_IMPLEMENTED,
- ERR_MULTIPLE_CALLBACK,
- ERR_STREAM_CANNOT_PIPE,
- ERR_STREAM_DESTROYED,
- ERR_STREAM_ALREADY_FINISHED,
- ERR_STREAM_NULL_VALUES,
- ERR_STREAM_WRITE_AFTER_END,
- ERR_UNKNOWN_ENCODING
- } = require('../../ours/errors').codes
- const { errorOrDestroy } = destroyImpl
- ObjectSetPrototypeOf(Writable.prototype, Stream.prototype)
- ObjectSetPrototypeOf(Writable, Stream)
- function nop() {}
- const kOnFinished = Symbol('kOnFinished')
- function WritableState(options, stream, isDuplex) {
- // Duplex streams are both readable and writable, but share
- // the same options object.
- // However, some cases require setting options to different
- // values for the readable and the writable sides of the duplex stream,
- // e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
- if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex')
- // Object stream flag to indicate whether or not this stream
- // contains buffers or objects.
- this.objectMode = !!(options && options.objectMode)
- if (isDuplex) this.objectMode = this.objectMode || !!(options && options.writableObjectMode)
- // The point at which write() starts returning false
- // Note: 0 is a valid value, means that we always return false if
- // the entire buffer is not flushed immediately on write().
- this.highWaterMark = options
- ? getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex)
- : getDefaultHighWaterMark(false)
- // if _final has been called.
- this.finalCalled = false
- // drain event flag.
- this.needDrain = false
- // At the start of calling end()
- this.ending = false
- // When end() has been called, and returned.
- this.ended = false
- // When 'finish' is emitted.
- this.finished = false
- // Has it been destroyed
- this.destroyed = false
- // Should we decode strings into buffers before passing to _write?
- // this is here so that some node-core streams can optimize string
- // handling at a lower level.
- const noDecode = !!(options && options.decodeStrings === false)
- this.decodeStrings = !noDecode
- // Crypto is kind of old and crusty. Historically, its default string
- // encoding is 'binary' so we have to make this configurable.
- // Everything else in the universe uses 'utf8', though.
- this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'
- // Not an actual buffer we keep track of, but a measurement
- // of how much we're waiting to get pushed to some underlying
- // socket or file.
- this.length = 0
- // A flag to see when we're in the middle of a write.
- this.writing = false
- // When true all writes will be buffered until .uncork() call.
- this.corked = 0
- // A flag to be able to tell if the onwrite cb is called immediately,
- // or on a later tick. We set this to true at first, because any
- // actions that shouldn't happen until "later" should generally also
- // not happen before the first write call.
- this.sync = true
- // A flag to know if we're processing previously buffered items, which
- // may call the _write() callback in the same tick, so that we don't
- // end up in an overlapped onwrite situation.
- this.bufferProcessing = false
- // The callback that's passed to _write(chunk, cb).
- this.onwrite = onwrite.bind(undefined, stream)
- // The callback that the user supplies to write(chunk, encoding, cb).
- this.writecb = null
- // The amount that is being written when _write is called.
- this.writelen = 0
- // Storage for data passed to the afterWrite() callback in case of
- // synchronous _write() completion.
- this.afterWriteTickInfo = null
- resetBuffer(this)
- // Number of pending user-supplied write callbacks
- // this must be 0 before 'finish' can be emitted.
- this.pendingcb = 0
- // Stream is still being constructed and cannot be
- // destroyed until construction finished or failed.
- // Async construction is opt in, therefore we start as
- // constructed.
- this.constructed = true
- // Emit prefinish if the only thing we're waiting for is _write cbs
- // This is relevant for synchronous Transform streams.
- this.prefinished = false
- // True if the error was already emitted and should not be thrown again.
- this.errorEmitted = false
- // Should close be emitted on destroy. Defaults to true.
- this.emitClose = !options || options.emitClose !== false
- // Should .destroy() be called after 'finish' (and potentially 'end').
- this.autoDestroy = !options || options.autoDestroy !== false
- // Indicates whether the stream has errored. When true all write() calls
- // should return false. This is needed since when autoDestroy
- // is disabled we need a way to tell whether the stream has failed.
- this.errored = null
- // Indicates whether the stream has finished destroying.
- this.closed = false
- // True if close has been emitted or would have been emitted
- // depending on emitClose.
- this.closeEmitted = false
- this[kOnFinished] = []
- }
- function resetBuffer(state) {
- state.buffered = []
- state.bufferedIndex = 0
- state.allBuffers = true
- state.allNoop = true
- }
- WritableState.prototype.getBuffer = function getBuffer() {
- return ArrayPrototypeSlice(this.buffered, this.bufferedIndex)
- }
- ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
- __proto__: null,
- get() {
- return this.buffered.length - this.bufferedIndex
- }
- })
- function Writable(options) {
- // Writable ctor is applied to Duplexes, too.
- // `realHasInstance` is necessary because using plain `instanceof`
- // would return false, as no `_writableState` property is attached.
- // Trying to use the custom `instanceof` for Writable here will also break the
- // Node.js LazyTransform implementation, which has a non-trivial getter for
- // `_writableState` that would lead to infinite recursion.
- // Checking for a Stream.Duplex instance is faster here instead of inside
- // the WritableState constructor, at least with V8 6.5.
- const isDuplex = this instanceof require('./duplex')
- if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) return new Writable(options)
- this._writableState = new WritableState(options, this, isDuplex)
- if (options) {
- if (typeof options.write === 'function') this._write = options.write
- if (typeof options.writev === 'function') this._writev = options.writev
- if (typeof options.destroy === 'function') this._destroy = options.destroy
- if (typeof options.final === 'function') this._final = options.final
- if (typeof options.construct === 'function') this._construct = options.construct
- if (options.signal) addAbortSignal(options.signal, this)
- }
- Stream.call(this, options)
- destroyImpl.construct(this, () => {
- const state = this._writableState
- if (!state.writing) {
- clearBuffer(this, state)
- }
- finishMaybe(this, state)
- })
- }
- ObjectDefineProperty(Writable, SymbolHasInstance, {
- __proto__: null,
- value: function (object) {
- if (FunctionPrototypeSymbolHasInstance(this, object)) return true
- if (this !== Writable) return false
- return object && object._writableState instanceof WritableState
- }
- })
- // Otherwise people can pipe Writable streams, which is just wrong.
- Writable.prototype.pipe = function () {
- errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE())
- }
- function _write(stream, chunk, encoding, cb) {
- const state = stream._writableState
- if (typeof encoding === 'function') {
- cb = encoding
- encoding = state.defaultEncoding
- } else {
- if (!encoding) encoding = state.defaultEncoding
- else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) throw new ERR_UNKNOWN_ENCODING(encoding)
- if (typeof cb !== 'function') cb = nop
- }
- if (chunk === null) {
- throw new ERR_STREAM_NULL_VALUES()
- } else if (!state.objectMode) {
- if (typeof chunk === 'string') {
- if (state.decodeStrings !== false) {
- chunk = Buffer.from(chunk, encoding)
- encoding = 'buffer'
- }
- } else if (chunk instanceof Buffer) {
- encoding = 'buffer'
- } else if (Stream._isUint8Array(chunk)) {
- chunk = Stream._uint8ArrayToBuffer(chunk)
- encoding = 'buffer'
- } else {
- throw new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk)
- }
- }
- let err
- if (state.ending) {
- err = new ERR_STREAM_WRITE_AFTER_END()
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED('write')
- }
- if (err) {
- process.nextTick(cb, err)
- errorOrDestroy(stream, err, true)
- return err
- }
- state.pendingcb++
- return writeOrBuffer(stream, state, chunk, encoding, cb)
- }
- Writable.prototype.write = function (chunk, encoding, cb) {
- return _write(this, chunk, encoding, cb) === true
- }
- Writable.prototype.cork = function () {
- this._writableState.corked++
- }
- Writable.prototype.uncork = function () {
- const state = this._writableState
- if (state.corked) {
- state.corked--
- if (!state.writing) clearBuffer(this, state)
- }
- }
- Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
- // node::ParseEncoding() requires lower case.
- if (typeof encoding === 'string') encoding = StringPrototypeToLowerCase(encoding)
- if (!Buffer.isEncoding(encoding)) throw new ERR_UNKNOWN_ENCODING(encoding)
- this._writableState.defaultEncoding = encoding
- return this
- }
- // If we're already writing something, then just put this
- // in the queue, and wait our turn. Otherwise, call _write
- // If we return false, then we need a drain event, so set that flag.
- function writeOrBuffer(stream, state, chunk, encoding, callback) {
- const len = state.objectMode ? 1 : chunk.length
- state.length += len
- // stream._write resets state.length
- const ret = state.length < state.highWaterMark
- // We must ensure that previous needDrain will not be reset to false.
- if (!ret) state.needDrain = true
- if (state.writing || state.corked || state.errored || !state.constructed) {
- state.buffered.push({
- chunk,
- encoding,
- callback
- })
- if (state.allBuffers && encoding !== 'buffer') {
- state.allBuffers = false
- }
- if (state.allNoop && callback !== nop) {
- state.allNoop = false
- }
- } else {
- state.writelen = len
- state.writecb = callback
- state.writing = true
- state.sync = true
- stream._write(chunk, encoding, state.onwrite)
- state.sync = false
- }
- // Return false if errored or destroyed in order to break
- // any synchronous while(stream.write(data)) loops.
- return ret && !state.errored && !state.destroyed
- }
- function doWrite(stream, state, writev, len, chunk, encoding, cb) {
- state.writelen = len
- state.writecb = cb
- state.writing = true
- state.sync = true
- if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write'))
- else if (writev) stream._writev(chunk, state.onwrite)
- else stream._write(chunk, encoding, state.onwrite)
- state.sync = false
- }
- function onwriteError(stream, state, er, cb) {
- --state.pendingcb
- cb(er)
- // Ensure callbacks are invoked even when autoDestroy is
- // not enabled. Passing `er` here doesn't make sense since
- // it's related to one specific write, not to the buffered
- // writes.
- errorBuffer(state)
- // This can emit error, but error must always follow cb.
- errorOrDestroy(stream, er)
- }
- function onwrite(stream, er) {
- const state = stream._writableState
- const sync = state.sync
- const cb = state.writecb
- if (typeof cb !== 'function') {
- errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK())
- return
- }
- state.writing = false
- state.writecb = null
- state.length -= state.writelen
- state.writelen = 0
- if (er) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- er.stack // eslint-disable-line no-unused-expressions
- if (!state.errored) {
- state.errored = er
- }
- // In case of duplex streams we need to notify the readable side of the
- // error.
- if (stream._readableState && !stream._readableState.errored) {
- stream._readableState.errored = er
- }
- if (sync) {
- process.nextTick(onwriteError, stream, state, er, cb)
- } else {
- onwriteError(stream, state, er, cb)
- }
- } else {
- if (state.buffered.length > state.bufferedIndex) {
- clearBuffer(stream, state)
- }
- if (sync) {
- // It is a common case that the callback passed to .write() is always
- // the same. In that case, we do not schedule a new nextTick(), but
- // rather just increase a counter, to improve performance and avoid
- // memory allocations.
- if (state.afterWriteTickInfo !== null && state.afterWriteTickInfo.cb === cb) {
- state.afterWriteTickInfo.count++
- } else {
- state.afterWriteTickInfo = {
- count: 1,
- cb,
- stream,
- state
- }
- process.nextTick(afterWriteTick, state.afterWriteTickInfo)
- }
- } else {
- afterWrite(stream, state, 1, cb)
- }
- }
- }
- function afterWriteTick({ stream, state, count, cb }) {
- state.afterWriteTickInfo = null
- return afterWrite(stream, state, count, cb)
- }
- function afterWrite(stream, state, count, cb) {
- const needDrain = !state.ending && !stream.destroyed && state.length === 0 && state.needDrain
- if (needDrain) {
- state.needDrain = false
- stream.emit('drain')
- }
- while (count-- > 0) {
- state.pendingcb--
- cb()
- }
- if (state.destroyed) {
- errorBuffer(state)
- }
- finishMaybe(stream, state)
- }
- // If there's something in the buffer waiting, then invoke callbacks.
- function errorBuffer(state) {
- if (state.writing) {
- return
- }
- for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
- var _state$errored
- const { chunk, callback } = state.buffered[n]
- const len = state.objectMode ? 1 : chunk.length
- state.length -= len
- callback(
- (_state$errored = state.errored) !== null && _state$errored !== undefined
- ? _state$errored
- : new ERR_STREAM_DESTROYED('write')
- )
- }
- const onfinishCallbacks = state[kOnFinished].splice(0)
- for (let i = 0; i < onfinishCallbacks.length; i++) {
- var _state$errored2
- onfinishCallbacks[i](
- (_state$errored2 = state.errored) !== null && _state$errored2 !== undefined
- ? _state$errored2
- : new ERR_STREAM_DESTROYED('end')
- )
- }
- resetBuffer(state)
- }
- // If there's something in the buffer waiting, then process it.
- function clearBuffer(stream, state) {
- if (state.corked || state.bufferProcessing || state.destroyed || !state.constructed) {
- return
- }
- const { buffered, bufferedIndex, objectMode } = state
- const bufferedLength = buffered.length - bufferedIndex
- if (!bufferedLength) {
- return
- }
- let i = bufferedIndex
- state.bufferProcessing = true
- if (bufferedLength > 1 && stream._writev) {
- state.pendingcb -= bufferedLength - 1
- const callback = state.allNoop
- ? nop
- : (err) => {
- for (let n = i; n < buffered.length; ++n) {
- buffered[n].callback(err)
- }
- }
- // Make a copy of `buffered` if it's going to be used by `callback` above,
- // since `doWrite` will mutate the array.
- const chunks = state.allNoop && i === 0 ? buffered : ArrayPrototypeSlice(buffered, i)
- chunks.allBuffers = state.allBuffers
- doWrite(stream, state, true, state.length, chunks, '', callback)
- resetBuffer(state)
- } else {
- do {
- const { chunk, encoding, callback } = buffered[i]
- buffered[i++] = null
- const len = objectMode ? 1 : chunk.length
- doWrite(stream, state, false, len, chunk, encoding, callback)
- } while (i < buffered.length && !state.writing)
- if (i === buffered.length) {
- resetBuffer(state)
- } else if (i > 256) {
- buffered.splice(0, i)
- state.bufferedIndex = 0
- } else {
- state.bufferedIndex = i
- }
- }
- state.bufferProcessing = false
- }
- Writable.prototype._write = function (chunk, encoding, cb) {
- if (this._writev) {
- this._writev(
- [
- {
- chunk,
- encoding
- }
- ],
- cb
- )
- } else {
- throw new ERR_METHOD_NOT_IMPLEMENTED('_write()')
- }
- }
- Writable.prototype._writev = null
- Writable.prototype.end = function (chunk, encoding, cb) {
- const state = this._writableState
- if (typeof chunk === 'function') {
- cb = chunk
- chunk = null
- encoding = null
- } else if (typeof encoding === 'function') {
- cb = encoding
- encoding = null
- }
- let err
- if (chunk !== null && chunk !== undefined) {
- const ret = _write(this, chunk, encoding)
- if (ret instanceof Error) {
- err = ret
- }
- }
- // .end() fully uncorks.
- if (state.corked) {
- state.corked = 1
- this.uncork()
- }
- if (err) {
- // Do nothing...
- } else if (!state.errored && !state.ending) {
- // This is forgiving in terms of unnecessary calls to end() and can hide
- // logic errors. However, usually such errors are harmless and causing a
- // hard error can be disproportionately destructive. It is not always
- // trivial for the user to determine whether end() needs to be called
- // or not.
- state.ending = true
- finishMaybe(this, state, true)
- state.ended = true
- } else if (state.finished) {
- err = new ERR_STREAM_ALREADY_FINISHED('end')
- } else if (state.destroyed) {
- err = new ERR_STREAM_DESTROYED('end')
- }
- if (typeof cb === 'function') {
- if (err || state.finished) {
- process.nextTick(cb, err)
- } else {
- state[kOnFinished].push(cb)
- }
- }
- return this
- }
- function needFinish(state) {
- return (
- state.ending &&
- !state.destroyed &&
- state.constructed &&
- state.length === 0 &&
- !state.errored &&
- state.buffered.length === 0 &&
- !state.finished &&
- !state.writing &&
- !state.errorEmitted &&
- !state.closeEmitted
- )
- }
- function callFinal(stream, state) {
- let called = false
- function onFinish(err) {
- if (called) {
- errorOrDestroy(stream, err !== null && err !== undefined ? err : ERR_MULTIPLE_CALLBACK())
- return
- }
- called = true
- state.pendingcb--
- if (err) {
- const onfinishCallbacks = state[kOnFinished].splice(0)
- for (let i = 0; i < onfinishCallbacks.length; i++) {
- onfinishCallbacks[i](err)
- }
- errorOrDestroy(stream, err, state.sync)
- } else if (needFinish(state)) {
- state.prefinished = true
- stream.emit('prefinish')
- // Backwards compat. Don't check state.sync here.
- // Some streams assume 'finish' will be emitted
- // asynchronously relative to _final callback.
- state.pendingcb++
- process.nextTick(finish, stream, state)
- }
- }
- state.sync = true
- state.pendingcb++
- try {
- stream._final(onFinish)
- } catch (err) {
- onFinish(err)
- }
- state.sync = false
- }
- function prefinish(stream, state) {
- if (!state.prefinished && !state.finalCalled) {
- if (typeof stream._final === 'function' && !state.destroyed) {
- state.finalCalled = true
- callFinal(stream, state)
- } else {
- state.prefinished = true
- stream.emit('prefinish')
- }
- }
- }
- function finishMaybe(stream, state, sync) {
- if (needFinish(state)) {
- prefinish(stream, state)
- if (state.pendingcb === 0) {
- if (sync) {
- state.pendingcb++
- process.nextTick(
- (stream, state) => {
- if (needFinish(state)) {
- finish(stream, state)
- } else {
- state.pendingcb--
- }
- },
- stream,
- state
- )
- } else if (needFinish(state)) {
- state.pendingcb++
- finish(stream, state)
- }
- }
- }
- }
- function finish(stream, state) {
- state.pendingcb--
- state.finished = true
- const onfinishCallbacks = state[kOnFinished].splice(0)
- for (let i = 0; i < onfinishCallbacks.length; i++) {
- onfinishCallbacks[i]()
- }
- stream.emit('finish')
- if (state.autoDestroy) {
- // In case of duplex streams we need a way to detect
- // if the readable side is ready for autoDestroy as well.
- const rState = stream._readableState
- const autoDestroy =
- !rState ||
- (rState.autoDestroy &&
- // We don't expect the readable to ever 'end'
- // if readable is explicitly set to false.
- (rState.endEmitted || rState.readable === false))
- if (autoDestroy) {
- stream.destroy()
- }
- }
- }
- ObjectDefineProperties(Writable.prototype, {
- closed: {
- __proto__: null,
- get() {
- return this._writableState ? this._writableState.closed : false
- }
- },
- destroyed: {
- __proto__: null,
- get() {
- return this._writableState ? this._writableState.destroyed : false
- },
- set(value) {
- // Backward compatibility, the user is explicitly managing destroyed.
- if (this._writableState) {
- this._writableState.destroyed = value
- }
- }
- },
- writable: {
- __proto__: null,
- get() {
- const w = this._writableState
- // w.writable === false means that this is part of a Duplex stream
- // where the writable side was disabled upon construction.
- // Compat. The user might manually disable writable side through
- // deprecated setter.
- return !!w && w.writable !== false && !w.destroyed && !w.errored && !w.ending && !w.ended
- },
- set(val) {
- // Backwards compatible.
- if (this._writableState) {
- this._writableState.writable = !!val
- }
- }
- },
- writableFinished: {
- __proto__: null,
- get() {
- return this._writableState ? this._writableState.finished : false
- }
- },
- writableObjectMode: {
- __proto__: null,
- get() {
- return this._writableState ? this._writableState.objectMode : false
- }
- },
- writableBuffer: {
- __proto__: null,
- get() {
- return this._writableState && this._writableState.getBuffer()
- }
- },
- writableEnded: {
- __proto__: null,
- get() {
- return this._writableState ? this._writableState.ending : false
- }
- },
- writableNeedDrain: {
- __proto__: null,
- get() {
- const wState = this._writableState
- if (!wState) return false
- return !wState.destroyed && !wState.ending && wState.needDrain
- }
- },
- writableHighWaterMark: {
- __proto__: null,
- get() {
- return this._writableState && this._writableState.highWaterMark
- }
- },
- writableCorked: {
- __proto__: null,
- get() {
- return this._writableState ? this._writableState.corked : 0
- }
- },
- writableLength: {
- __proto__: null,
- get() {
- return this._writableState && this._writableState.length
- }
- },
- errored: {
- __proto__: null,
- enumerable: false,
- get() {
- return this._writableState ? this._writableState.errored : null
- }
- },
- writableAborted: {
- __proto__: null,
- enumerable: false,
- get: function () {
- return !!(
- this._writableState.writable !== false &&
- (this._writableState.destroyed || this._writableState.errored) &&
- !this._writableState.finished
- )
- }
- }
- })
- const destroy = destroyImpl.destroy
- Writable.prototype.destroy = function (err, cb) {
- const state = this._writableState
- // Invoke pending callbacks.
- if (!state.destroyed && (state.bufferedIndex < state.buffered.length || state[kOnFinished].length)) {
- process.nextTick(errorBuffer, state)
- }
- destroy.call(this, err, cb)
- return this
- }
- Writable.prototype._undestroy = destroyImpl.undestroy
- Writable.prototype._destroy = function (err, cb) {
- cb(err)
- }
- Writable.prototype[EE.captureRejectionSymbol] = function (err) {
- this.destroy(err)
- }
- let webStreamsAdapters
- // Lazy to avoid circular references
- function lazyWebStreams() {
- if (webStreamsAdapters === undefined) webStreamsAdapters = {}
- return webStreamsAdapters
- }
- Writable.fromWeb = function (writableStream, options) {
- return lazyWebStreams().newStreamWritableFromWritableStream(writableStream, options)
- }
- Writable.toWeb = function (streamWritable) {
- return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable)
- }
|