| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184 |
- const { EventEmitter } = require('events-universal')
- const STREAM_DESTROYED = new Error('Stream was destroyed')
- const PREMATURE_CLOSE = new Error('Premature close')
- const FIFO = require('fast-fifo')
- const TextDecoder = require('text-decoder')
- // if we do a future major, expect queue microtask to be there always, for now a bit defensive
- const qmt = typeof queueMicrotask === 'undefined' ? fn => global.process.nextTick(fn) : queueMicrotask
- /* eslint-disable no-multi-spaces */
- // 29 bits used total (4 from shared, 14 from read, and 11 from write)
- const MAX = ((1 << 29) - 1)
- // Shared state
- const OPENING = 0b0001
- const PREDESTROYING = 0b0010
- const DESTROYING = 0b0100
- const DESTROYED = 0b1000
- const NOT_OPENING = MAX ^ OPENING
- const NOT_PREDESTROYING = MAX ^ PREDESTROYING
- // Read state (4 bit offset from shared state)
- const READ_ACTIVE = 0b00000000000001 << 4
- const READ_UPDATING = 0b00000000000010 << 4
- const READ_PRIMARY = 0b00000000000100 << 4
- const READ_QUEUED = 0b00000000001000 << 4
- const READ_RESUMED = 0b00000000010000 << 4
- const READ_PIPE_DRAINED = 0b00000000100000 << 4
- const READ_ENDING = 0b00000001000000 << 4
- const READ_EMIT_DATA = 0b00000010000000 << 4
- const READ_EMIT_READABLE = 0b00000100000000 << 4
- const READ_EMITTED_READABLE = 0b00001000000000 << 4
- const READ_DONE = 0b00010000000000 << 4
- const READ_NEXT_TICK = 0b00100000000000 << 4
- const READ_NEEDS_PUSH = 0b01000000000000 << 4
- const READ_READ_AHEAD = 0b10000000000000 << 4
- // Combined read state
- const READ_FLOWING = READ_RESUMED | READ_PIPE_DRAINED
- const READ_ACTIVE_AND_NEEDS_PUSH = READ_ACTIVE | READ_NEEDS_PUSH
- const READ_PRIMARY_AND_ACTIVE = READ_PRIMARY | READ_ACTIVE
- const READ_EMIT_READABLE_AND_QUEUED = READ_EMIT_READABLE | READ_QUEUED
- const READ_RESUMED_READ_AHEAD = READ_RESUMED | READ_READ_AHEAD
- const READ_NOT_ACTIVE = MAX ^ READ_ACTIVE
- const READ_NON_PRIMARY = MAX ^ READ_PRIMARY
- const READ_NON_PRIMARY_AND_PUSHED = MAX ^ (READ_PRIMARY | READ_NEEDS_PUSH)
- const READ_PUSHED = MAX ^ READ_NEEDS_PUSH
- const READ_PAUSED = MAX ^ READ_RESUMED
- const READ_NOT_QUEUED = MAX ^ (READ_QUEUED | READ_EMITTED_READABLE)
- const READ_NOT_ENDING = MAX ^ READ_ENDING
- const READ_PIPE_NOT_DRAINED = MAX ^ READ_FLOWING
- const READ_NOT_NEXT_TICK = MAX ^ READ_NEXT_TICK
- const READ_NOT_UPDATING = MAX ^ READ_UPDATING
- const READ_NO_READ_AHEAD = MAX ^ READ_READ_AHEAD
- const READ_PAUSED_NO_READ_AHEAD = MAX ^ READ_RESUMED_READ_AHEAD
- // Write state (18 bit offset, 4 bit offset from shared state and 14 from read state)
- const WRITE_ACTIVE = 0b00000000001 << 18
- const WRITE_UPDATING = 0b00000000010 << 18
- const WRITE_PRIMARY = 0b00000000100 << 18
- const WRITE_QUEUED = 0b00000001000 << 18
- const WRITE_UNDRAINED = 0b00000010000 << 18
- const WRITE_DONE = 0b00000100000 << 18
- const WRITE_EMIT_DRAIN = 0b00001000000 << 18
- const WRITE_NEXT_TICK = 0b00010000000 << 18
- const WRITE_WRITING = 0b00100000000 << 18
- const WRITE_FINISHING = 0b01000000000 << 18
- const WRITE_CORKED = 0b10000000000 << 18
- const WRITE_NOT_ACTIVE = MAX ^ (WRITE_ACTIVE | WRITE_WRITING)
- const WRITE_NON_PRIMARY = MAX ^ WRITE_PRIMARY
- const WRITE_NOT_FINISHING = MAX ^ (WRITE_ACTIVE | WRITE_FINISHING)
- const WRITE_DRAINED = MAX ^ WRITE_UNDRAINED
- const WRITE_NOT_QUEUED = MAX ^ WRITE_QUEUED
- const WRITE_NOT_NEXT_TICK = MAX ^ WRITE_NEXT_TICK
- const WRITE_NOT_UPDATING = MAX ^ WRITE_UPDATING
- const WRITE_NOT_CORKED = MAX ^ WRITE_CORKED
- // Combined shared state
- const ACTIVE = READ_ACTIVE | WRITE_ACTIVE
- const NOT_ACTIVE = MAX ^ ACTIVE
- const DONE = READ_DONE | WRITE_DONE
- const DESTROY_STATUS = DESTROYING | DESTROYED | PREDESTROYING
- const OPEN_STATUS = DESTROY_STATUS | OPENING
- const AUTO_DESTROY = DESTROY_STATUS | DONE
- const NON_PRIMARY = WRITE_NON_PRIMARY & READ_NON_PRIMARY
- const ACTIVE_OR_TICKING = WRITE_NEXT_TICK | READ_NEXT_TICK
- const TICKING = ACTIVE_OR_TICKING & NOT_ACTIVE
- const IS_OPENING = OPEN_STATUS | TICKING
- // Combined shared state and read state
- const READ_PRIMARY_STATUS = OPEN_STATUS | READ_ENDING | READ_DONE
- const READ_STATUS = OPEN_STATUS | READ_DONE | READ_QUEUED
- const READ_ENDING_STATUS = OPEN_STATUS | READ_ENDING | READ_QUEUED
- const READ_READABLE_STATUS = OPEN_STATUS | READ_EMIT_READABLE | READ_QUEUED | READ_EMITTED_READABLE
- const SHOULD_NOT_READ = OPEN_STATUS | READ_ACTIVE | READ_ENDING | READ_DONE | READ_NEEDS_PUSH | READ_READ_AHEAD
- const READ_BACKPRESSURE_STATUS = DESTROY_STATUS | READ_ENDING | READ_DONE
- const READ_UPDATE_SYNC_STATUS = READ_UPDATING | OPEN_STATUS | READ_NEXT_TICK | READ_PRIMARY
- const READ_NEXT_TICK_OR_OPENING = READ_NEXT_TICK | OPENING
- // Combined write state
- const WRITE_PRIMARY_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_DONE
- const WRITE_QUEUED_AND_UNDRAINED = WRITE_QUEUED | WRITE_UNDRAINED
- const WRITE_QUEUED_AND_ACTIVE = WRITE_QUEUED | WRITE_ACTIVE
- const WRITE_DRAIN_STATUS = WRITE_QUEUED | WRITE_UNDRAINED | OPEN_STATUS | WRITE_ACTIVE
- const WRITE_STATUS = OPEN_STATUS | WRITE_ACTIVE | WRITE_QUEUED | WRITE_CORKED
- const WRITE_PRIMARY_AND_ACTIVE = WRITE_PRIMARY | WRITE_ACTIVE
- const WRITE_ACTIVE_AND_WRITING = WRITE_ACTIVE | WRITE_WRITING
- const WRITE_FINISHING_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_QUEUED_AND_ACTIVE | WRITE_DONE
- const WRITE_BACKPRESSURE_STATUS = WRITE_UNDRAINED | DESTROY_STATUS | WRITE_FINISHING | WRITE_DONE
- const WRITE_UPDATE_SYNC_STATUS = WRITE_UPDATING | OPEN_STATUS | WRITE_NEXT_TICK | WRITE_PRIMARY
- const WRITE_DROP_DATA = WRITE_FINISHING | WRITE_DONE | DESTROY_STATUS
- const asyncIterator = Symbol.asyncIterator || Symbol('asyncIterator')
- class WritableState {
- constructor (stream, { highWaterMark = 16384, map = null, mapWritable, byteLength, byteLengthWritable } = {}) {
- this.stream = stream
- this.queue = new FIFO()
- this.highWaterMark = highWaterMark
- this.buffered = 0
- this.error = null
- this.pipeline = null
- this.drains = null // if we add more seldomly used helpers we might them into a subobject so its a single ptr
- this.byteLength = byteLengthWritable || byteLength || defaultByteLength
- this.map = mapWritable || map
- this.afterWrite = afterWrite.bind(this)
- this.afterUpdateNextTick = updateWriteNT.bind(this)
- }
- get ended () {
- return (this.stream._duplexState & WRITE_DONE) !== 0
- }
- push (data) {
- if ((this.stream._duplexState & WRITE_DROP_DATA) !== 0) return false
- if (this.map !== null) data = this.map(data)
- this.buffered += this.byteLength(data)
- this.queue.push(data)
- if (this.buffered < this.highWaterMark) {
- this.stream._duplexState |= WRITE_QUEUED
- return true
- }
- this.stream._duplexState |= WRITE_QUEUED_AND_UNDRAINED
- return false
- }
- shift () {
- const data = this.queue.shift()
- this.buffered -= this.byteLength(data)
- if (this.buffered === 0) this.stream._duplexState &= WRITE_NOT_QUEUED
- return data
- }
- end (data) {
- if (typeof data === 'function') this.stream.once('finish', data)
- else if (data !== undefined && data !== null) this.push(data)
- this.stream._duplexState = (this.stream._duplexState | WRITE_FINISHING) & WRITE_NON_PRIMARY
- }
- autoBatch (data, cb) {
- const buffer = []
- const stream = this.stream
- buffer.push(data)
- while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED_AND_ACTIVE) {
- buffer.push(stream._writableState.shift())
- }
- if ((stream._duplexState & OPEN_STATUS) !== 0) return cb(null)
- stream._writev(buffer, cb)
- }
- update () {
- const stream = this.stream
- stream._duplexState |= WRITE_UPDATING
- do {
- while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED) {
- const data = this.shift()
- stream._duplexState |= WRITE_ACTIVE_AND_WRITING
- stream._write(data, this.afterWrite)
- }
- if ((stream._duplexState & WRITE_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
- } while (this.continueUpdate() === true)
- stream._duplexState &= WRITE_NOT_UPDATING
- }
- updateNonPrimary () {
- const stream = this.stream
- if ((stream._duplexState & WRITE_FINISHING_STATUS) === WRITE_FINISHING) {
- stream._duplexState = stream._duplexState | WRITE_ACTIVE
- stream._final(afterFinal.bind(this))
- return
- }
- if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
- if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
- stream._duplexState |= ACTIVE
- stream._destroy(afterDestroy.bind(this))
- }
- return
- }
- if ((stream._duplexState & IS_OPENING) === OPENING) {
- stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
- stream._open(afterOpen.bind(this))
- }
- }
- continueUpdate () {
- if ((this.stream._duplexState & WRITE_NEXT_TICK) === 0) return false
- this.stream._duplexState &= WRITE_NOT_NEXT_TICK
- return true
- }
- updateCallback () {
- if ((this.stream._duplexState & WRITE_UPDATE_SYNC_STATUS) === WRITE_PRIMARY) this.update()
- else this.updateNextTick()
- }
- updateNextTick () {
- if ((this.stream._duplexState & WRITE_NEXT_TICK) !== 0) return
- this.stream._duplexState |= WRITE_NEXT_TICK
- if ((this.stream._duplexState & WRITE_UPDATING) === 0) qmt(this.afterUpdateNextTick)
- }
- }
- class ReadableState {
- constructor (stream, { highWaterMark = 16384, map = null, mapReadable, byteLength, byteLengthReadable } = {}) {
- this.stream = stream
- this.queue = new FIFO()
- this.highWaterMark = highWaterMark === 0 ? 1 : highWaterMark
- this.buffered = 0
- this.readAhead = highWaterMark > 0
- this.error = null
- this.pipeline = null
- this.byteLength = byteLengthReadable || byteLength || defaultByteLength
- this.map = mapReadable || map
- this.pipeTo = null
- this.afterRead = afterRead.bind(this)
- this.afterUpdateNextTick = updateReadNT.bind(this)
- }
- get ended () {
- return (this.stream._duplexState & READ_DONE) !== 0
- }
- pipe (pipeTo, cb) {
- if (this.pipeTo !== null) throw new Error('Can only pipe to one destination')
- if (typeof cb !== 'function') cb = null
- this.stream._duplexState |= READ_PIPE_DRAINED
- this.pipeTo = pipeTo
- this.pipeline = new Pipeline(this.stream, pipeTo, cb)
- if (cb) this.stream.on('error', noop) // We already error handle this so supress crashes
- if (isStreamx(pipeTo)) {
- pipeTo._writableState.pipeline = this.pipeline
- if (cb) pipeTo.on('error', noop) // We already error handle this so supress crashes
- pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline)) // TODO: just call finished from pipeTo itself
- } else {
- const onerror = this.pipeline.done.bind(this.pipeline, pipeTo)
- const onclose = this.pipeline.done.bind(this.pipeline, pipeTo, null) // onclose has a weird bool arg
- pipeTo.on('error', onerror)
- pipeTo.on('close', onclose)
- pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline))
- }
- pipeTo.on('drain', afterDrain.bind(this))
- this.stream.emit('piping', pipeTo)
- pipeTo.emit('pipe', this.stream)
- }
- push (data) {
- const stream = this.stream
- if (data === null) {
- this.highWaterMark = 0
- stream._duplexState = (stream._duplexState | READ_ENDING) & READ_NON_PRIMARY_AND_PUSHED
- return false
- }
- if (this.map !== null) {
- data = this.map(data)
- if (data === null) {
- stream._duplexState &= READ_PUSHED
- return this.buffered < this.highWaterMark
- }
- }
- this.buffered += this.byteLength(data)
- this.queue.push(data)
- stream._duplexState = (stream._duplexState | READ_QUEUED) & READ_PUSHED
- return this.buffered < this.highWaterMark
- }
- shift () {
- const data = this.queue.shift()
- this.buffered -= this.byteLength(data)
- if (this.buffered === 0) this.stream._duplexState &= READ_NOT_QUEUED
- return data
- }
- unshift (data) {
- const pending = [this.map !== null ? this.map(data) : data]
- while (this.buffered > 0) pending.push(this.shift())
- for (let i = 0; i < pending.length - 1; i++) {
- const data = pending[i]
- this.buffered += this.byteLength(data)
- this.queue.push(data)
- }
- this.push(pending[pending.length - 1])
- }
- read () {
- const stream = this.stream
- if ((stream._duplexState & READ_STATUS) === READ_QUEUED) {
- const data = this.shift()
- if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
- if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
- return data
- }
- if (this.readAhead === false) {
- stream._duplexState |= READ_READ_AHEAD
- this.updateNextTick()
- }
- return null
- }
- drain () {
- const stream = this.stream
- while ((stream._duplexState & READ_STATUS) === READ_QUEUED && (stream._duplexState & READ_FLOWING) !== 0) {
- const data = this.shift()
- if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
- if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
- }
- }
- update () {
- const stream = this.stream
- stream._duplexState |= READ_UPDATING
- do {
- this.drain()
- while (this.buffered < this.highWaterMark && (stream._duplexState & SHOULD_NOT_READ) === READ_READ_AHEAD) {
- stream._duplexState |= READ_ACTIVE_AND_NEEDS_PUSH
- stream._read(this.afterRead)
- this.drain()
- }
- if ((stream._duplexState & READ_READABLE_STATUS) === READ_EMIT_READABLE_AND_QUEUED) {
- stream._duplexState |= READ_EMITTED_READABLE
- stream.emit('readable')
- }
- if ((stream._duplexState & READ_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
- } while (this.continueUpdate() === true)
- stream._duplexState &= READ_NOT_UPDATING
- }
- updateNonPrimary () {
- const stream = this.stream
- if ((stream._duplexState & READ_ENDING_STATUS) === READ_ENDING) {
- stream._duplexState = (stream._duplexState | READ_DONE) & READ_NOT_ENDING
- stream.emit('end')
- if ((stream._duplexState & AUTO_DESTROY) === DONE) stream._duplexState |= DESTROYING
- if (this.pipeTo !== null) this.pipeTo.end()
- }
- if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
- if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
- stream._duplexState |= ACTIVE
- stream._destroy(afterDestroy.bind(this))
- }
- return
- }
- if ((stream._duplexState & IS_OPENING) === OPENING) {
- stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
- stream._open(afterOpen.bind(this))
- }
- }
- continueUpdate () {
- if ((this.stream._duplexState & READ_NEXT_TICK) === 0) return false
- this.stream._duplexState &= READ_NOT_NEXT_TICK
- return true
- }
- updateCallback () {
- if ((this.stream._duplexState & READ_UPDATE_SYNC_STATUS) === READ_PRIMARY) this.update()
- else this.updateNextTick()
- }
- updateNextTickIfOpen () {
- if ((this.stream._duplexState & READ_NEXT_TICK_OR_OPENING) !== 0) return
- this.stream._duplexState |= READ_NEXT_TICK
- if ((this.stream._duplexState & READ_UPDATING) === 0) qmt(this.afterUpdateNextTick)
- }
- updateNextTick () {
- if ((this.stream._duplexState & READ_NEXT_TICK) !== 0) return
- this.stream._duplexState |= READ_NEXT_TICK
- if ((this.stream._duplexState & READ_UPDATING) === 0) qmt(this.afterUpdateNextTick)
- }
- }
- class TransformState {
- constructor (stream) {
- this.data = null
- this.afterTransform = afterTransform.bind(stream)
- this.afterFinal = null
- }
- }
- class Pipeline {
- constructor (src, dst, cb) {
- this.from = src
- this.to = dst
- this.afterPipe = cb
- this.error = null
- this.pipeToFinished = false
- }
- finished () {
- this.pipeToFinished = true
- }
- done (stream, err) {
- if (err) this.error = err
- if (stream === this.to) {
- this.to = null
- if (this.from !== null) {
- if ((this.from._duplexState & READ_DONE) === 0 || !this.pipeToFinished) {
- this.from.destroy(this.error || new Error('Writable stream closed prematurely'))
- }
- return
- }
- }
- if (stream === this.from) {
- this.from = null
- if (this.to !== null) {
- if ((stream._duplexState & READ_DONE) === 0) {
- this.to.destroy(this.error || new Error('Readable stream closed before ending'))
- }
- return
- }
- }
- if (this.afterPipe !== null) this.afterPipe(this.error)
- this.to = this.from = this.afterPipe = null
- }
- }
- function afterDrain () {
- this.stream._duplexState |= READ_PIPE_DRAINED
- this.updateCallback()
- }
- function afterFinal (err) {
- const stream = this.stream
- if (err) stream.destroy(err)
- if ((stream._duplexState & DESTROY_STATUS) === 0) {
- stream._duplexState |= WRITE_DONE
- stream.emit('finish')
- }
- if ((stream._duplexState & AUTO_DESTROY) === DONE) {
- stream._duplexState |= DESTROYING
- }
- stream._duplexState &= WRITE_NOT_FINISHING
- // no need to wait the extra tick here, so we short circuit that
- if ((stream._duplexState & WRITE_UPDATING) === 0) this.update()
- else this.updateNextTick()
- }
- function afterDestroy (err) {
- const stream = this.stream
- if (!err && this.error !== STREAM_DESTROYED) err = this.error
- if (err) stream.emit('error', err)
- stream._duplexState |= DESTROYED
- stream.emit('close')
- const rs = stream._readableState
- const ws = stream._writableState
- if (rs !== null && rs.pipeline !== null) rs.pipeline.done(stream, err)
- if (ws !== null) {
- while (ws.drains !== null && ws.drains.length > 0) ws.drains.shift().resolve(false)
- if (ws.pipeline !== null) ws.pipeline.done(stream, err)
- }
- }
- function afterWrite (err) {
- const stream = this.stream
- if (err) stream.destroy(err)
- stream._duplexState &= WRITE_NOT_ACTIVE
- if (this.drains !== null) tickDrains(this.drains)
- if ((stream._duplexState & WRITE_DRAIN_STATUS) === WRITE_UNDRAINED) {
- stream._duplexState &= WRITE_DRAINED
- if ((stream._duplexState & WRITE_EMIT_DRAIN) === WRITE_EMIT_DRAIN) {
- stream.emit('drain')
- }
- }
- this.updateCallback()
- }
- function afterRead (err) {
- if (err) this.stream.destroy(err)
- this.stream._duplexState &= READ_NOT_ACTIVE
- if (this.readAhead === false && (this.stream._duplexState & READ_RESUMED) === 0) this.stream._duplexState &= READ_NO_READ_AHEAD
- this.updateCallback()
- }
- function updateReadNT () {
- if ((this.stream._duplexState & READ_UPDATING) === 0) {
- this.stream._duplexState &= READ_NOT_NEXT_TICK
- this.update()
- }
- }
- function updateWriteNT () {
- if ((this.stream._duplexState & WRITE_UPDATING) === 0) {
- this.stream._duplexState &= WRITE_NOT_NEXT_TICK
- this.update()
- }
- }
- function tickDrains (drains) {
- for (let i = 0; i < drains.length; i++) {
- // drains.writes are monotonic, so if one is 0 its always the first one
- if (--drains[i].writes === 0) {
- drains.shift().resolve(true)
- i--
- }
- }
- }
- function afterOpen (err) {
- const stream = this.stream
- if (err) stream.destroy(err)
- if ((stream._duplexState & DESTROYING) === 0) {
- if ((stream._duplexState & READ_PRIMARY_STATUS) === 0) stream._duplexState |= READ_PRIMARY
- if ((stream._duplexState & WRITE_PRIMARY_STATUS) === 0) stream._duplexState |= WRITE_PRIMARY
- stream.emit('open')
- }
- stream._duplexState &= NOT_ACTIVE
- if (stream._writableState !== null) {
- stream._writableState.updateCallback()
- }
- if (stream._readableState !== null) {
- stream._readableState.updateCallback()
- }
- }
- function afterTransform (err, data) {
- if (data !== undefined && data !== null) this.push(data)
- this._writableState.afterWrite(err)
- }
- function newListener (name) {
- if (this._readableState !== null) {
- if (name === 'data') {
- this._duplexState |= (READ_EMIT_DATA | READ_RESUMED_READ_AHEAD)
- this._readableState.updateNextTick()
- }
- if (name === 'readable') {
- this._duplexState |= READ_EMIT_READABLE
- this._readableState.updateNextTick()
- }
- }
- if (this._writableState !== null) {
- if (name === 'drain') {
- this._duplexState |= WRITE_EMIT_DRAIN
- this._writableState.updateNextTick()
- }
- }
- }
- class Stream extends EventEmitter {
- constructor (opts) {
- super()
- this._duplexState = 0
- this._readableState = null
- this._writableState = null
- if (opts) {
- if (opts.open) this._open = opts.open
- if (opts.destroy) this._destroy = opts.destroy
- if (opts.predestroy) this._predestroy = opts.predestroy
- if (opts.signal) {
- opts.signal.addEventListener('abort', abort.bind(this))
- }
- }
- this.on('newListener', newListener)
- }
- _open (cb) {
- cb(null)
- }
- _destroy (cb) {
- cb(null)
- }
- _predestroy () {
- // does nothing
- }
- get readable () {
- return this._readableState !== null ? true : undefined
- }
- get writable () {
- return this._writableState !== null ? true : undefined
- }
- get destroyed () {
- return (this._duplexState & DESTROYED) !== 0
- }
- get destroying () {
- return (this._duplexState & DESTROY_STATUS) !== 0
- }
- destroy (err) {
- if ((this._duplexState & DESTROY_STATUS) === 0) {
- if (!err) err = STREAM_DESTROYED
- this._duplexState = (this._duplexState | DESTROYING) & NON_PRIMARY
- if (this._readableState !== null) {
- this._readableState.highWaterMark = 0
- this._readableState.error = err
- }
- if (this._writableState !== null) {
- this._writableState.highWaterMark = 0
- this._writableState.error = err
- }
- this._duplexState |= PREDESTROYING
- this._predestroy()
- this._duplexState &= NOT_PREDESTROYING
- if (this._readableState !== null) this._readableState.updateNextTick()
- if (this._writableState !== null) this._writableState.updateNextTick()
- }
- }
- }
- class Readable extends Stream {
- constructor (opts) {
- super(opts)
- this._duplexState |= OPENING | WRITE_DONE | READ_READ_AHEAD
- this._readableState = new ReadableState(this, opts)
- if (opts) {
- if (this._readableState.readAhead === false) this._duplexState &= READ_NO_READ_AHEAD
- if (opts.read) this._read = opts.read
- if (opts.eagerOpen) this._readableState.updateNextTick()
- if (opts.encoding) this.setEncoding(opts.encoding)
- }
- }
- setEncoding (encoding) {
- const dec = new TextDecoder(encoding)
- const map = this._readableState.map || echo
- this._readableState.map = mapOrSkip
- return this
- function mapOrSkip (data) {
- const next = dec.push(data)
- return next === '' && (data.byteLength !== 0 || dec.remaining > 0) ? null : map(next)
- }
- }
- _read (cb) {
- cb(null)
- }
- pipe (dest, cb) {
- this._readableState.updateNextTick()
- this._readableState.pipe(dest, cb)
- return dest
- }
- read () {
- this._readableState.updateNextTick()
- return this._readableState.read()
- }
- push (data) {
- this._readableState.updateNextTickIfOpen()
- return this._readableState.push(data)
- }
- unshift (data) {
- this._readableState.updateNextTickIfOpen()
- return this._readableState.unshift(data)
- }
- resume () {
- this._duplexState |= READ_RESUMED_READ_AHEAD
- this._readableState.updateNextTick()
- return this
- }
- pause () {
- this._duplexState &= (this._readableState.readAhead === false ? READ_PAUSED_NO_READ_AHEAD : READ_PAUSED)
- return this
- }
- static _fromAsyncIterator (ite, opts) {
- let destroy
- const rs = new Readable({
- ...opts,
- read (cb) {
- ite.next().then(push).then(cb.bind(null, null)).catch(cb)
- },
- predestroy () {
- destroy = ite.return()
- },
- destroy (cb) {
- if (!destroy) return cb(null)
- destroy.then(cb.bind(null, null)).catch(cb)
- }
- })
- return rs
- function push (data) {
- if (data.done) rs.push(null)
- else rs.push(data.value)
- }
- }
- static from (data, opts) {
- if (isReadStreamx(data)) return data
- if (data[asyncIterator]) return this._fromAsyncIterator(data[asyncIterator](), opts)
- if (!Array.isArray(data)) data = data === undefined ? [] : [data]
- let i = 0
- return new Readable({
- ...opts,
- read (cb) {
- this.push(i === data.length ? null : data[i++])
- cb(null)
- }
- })
- }
- static isBackpressured (rs) {
- return (rs._duplexState & READ_BACKPRESSURE_STATUS) !== 0 || rs._readableState.buffered >= rs._readableState.highWaterMark
- }
- static isPaused (rs) {
- return (rs._duplexState & READ_RESUMED) === 0
- }
- [asyncIterator] () {
- const stream = this
- let error = null
- let promiseResolve = null
- let promiseReject = null
- this.on('error', (err) => { error = err })
- this.on('readable', onreadable)
- this.on('close', onclose)
- return {
- [asyncIterator] () {
- return this
- },
- next () {
- return new Promise(function (resolve, reject) {
- promiseResolve = resolve
- promiseReject = reject
- const data = stream.read()
- if (data !== null) ondata(data)
- else if ((stream._duplexState & DESTROYED) !== 0) ondata(null)
- })
- },
- return () {
- return destroy(null)
- },
- throw (err) {
- return destroy(err)
- }
- }
- function onreadable () {
- if (promiseResolve !== null) ondata(stream.read())
- }
- function onclose () {
- if (promiseResolve !== null) ondata(null)
- }
- function ondata (data) {
- if (promiseReject === null) return
- if (error) promiseReject(error)
- else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED)
- else promiseResolve({ value: data, done: data === null })
- promiseReject = promiseResolve = null
- }
- function destroy (err) {
- stream.destroy(err)
- return new Promise((resolve, reject) => {
- if (stream._duplexState & DESTROYED) return resolve({ value: undefined, done: true })
- stream.once('close', function () {
- if (err) reject(err)
- else resolve({ value: undefined, done: true })
- })
- })
- }
- }
- }
- class Writable extends Stream {
- constructor (opts) {
- super(opts)
- this._duplexState |= OPENING | READ_DONE
- this._writableState = new WritableState(this, opts)
- if (opts) {
- if (opts.writev) this._writev = opts.writev
- if (opts.write) this._write = opts.write
- if (opts.final) this._final = opts.final
- if (opts.eagerOpen) this._writableState.updateNextTick()
- }
- }
- cork () {
- this._duplexState |= WRITE_CORKED
- }
- uncork () {
- this._duplexState &= WRITE_NOT_CORKED
- this._writableState.updateNextTick()
- }
- _writev (batch, cb) {
- cb(null)
- }
- _write (data, cb) {
- this._writableState.autoBatch(data, cb)
- }
- _final (cb) {
- cb(null)
- }
- static isBackpressured (ws) {
- return (ws._duplexState & WRITE_BACKPRESSURE_STATUS) !== 0
- }
- static drained (ws) {
- if (ws.destroyed) return Promise.resolve(false)
- const state = ws._writableState
- const pending = (isWritev(ws) ? Math.min(1, state.queue.length) : state.queue.length)
- const writes = pending + ((ws._duplexState & WRITE_WRITING) ? 1 : 0)
- if (writes === 0) return Promise.resolve(true)
- if (state.drains === null) state.drains = []
- return new Promise((resolve) => {
- state.drains.push({ writes, resolve })
- })
- }
- write (data) {
- this._writableState.updateNextTick()
- return this._writableState.push(data)
- }
- end (data) {
- this._writableState.updateNextTick()
- this._writableState.end(data)
- return this
- }
- }
- class Duplex extends Readable { // and Writable
- constructor (opts) {
- super(opts)
- this._duplexState = OPENING | (this._duplexState & READ_READ_AHEAD)
- this._writableState = new WritableState(this, opts)
- if (opts) {
- if (opts.writev) this._writev = opts.writev
- if (opts.write) this._write = opts.write
- if (opts.final) this._final = opts.final
- }
- }
- cork () {
- this._duplexState |= WRITE_CORKED
- }
- uncork () {
- this._duplexState &= WRITE_NOT_CORKED
- this._writableState.updateNextTick()
- }
- _writev (batch, cb) {
- cb(null)
- }
- _write (data, cb) {
- this._writableState.autoBatch(data, cb)
- }
- _final (cb) {
- cb(null)
- }
- write (data) {
- this._writableState.updateNextTick()
- return this._writableState.push(data)
- }
- end (data) {
- this._writableState.updateNextTick()
- this._writableState.end(data)
- return this
- }
- }
- class Transform extends Duplex {
- constructor (opts) {
- super(opts)
- this._transformState = new TransformState(this)
- if (opts) {
- if (opts.transform) this._transform = opts.transform
- if (opts.flush) this._flush = opts.flush
- }
- }
- _write (data, cb) {
- if (this._readableState.buffered >= this._readableState.highWaterMark) {
- this._transformState.data = data
- } else {
- this._transform(data, this._transformState.afterTransform)
- }
- }
- _read (cb) {
- if (this._transformState.data !== null) {
- const data = this._transformState.data
- this._transformState.data = null
- cb(null)
- this._transform(data, this._transformState.afterTransform)
- } else {
- cb(null)
- }
- }
- destroy (err) {
- super.destroy(err)
- if (this._transformState.data !== null) {
- this._transformState.data = null
- this._transformState.afterTransform()
- }
- }
- _transform (data, cb) {
- cb(null, data)
- }
- _flush (cb) {
- cb(null)
- }
- _final (cb) {
- this._transformState.afterFinal = cb
- this._flush(transformAfterFlush.bind(this))
- }
- }
- class PassThrough extends Transform {}
- function transformAfterFlush (err, data) {
- const cb = this._transformState.afterFinal
- if (err) return cb(err)
- if (data !== null && data !== undefined) this.push(data)
- this.push(null)
- cb(null)
- }
- function pipelinePromise (...streams) {
- return new Promise((resolve, reject) => {
- return pipeline(...streams, (err) => {
- if (err) return reject(err)
- resolve()
- })
- })
- }
- function pipeline (stream, ...streams) {
- const all = Array.isArray(stream) ? [...stream, ...streams] : [stream, ...streams]
- const done = (all.length && typeof all[all.length - 1] === 'function') ? all.pop() : null
- if (all.length < 2) throw new Error('Pipeline requires at least 2 streams')
- let src = all[0]
- let dest = null
- let error = null
- for (let i = 1; i < all.length; i++) {
- dest = all[i]
- if (isStreamx(src)) {
- src.pipe(dest, onerror)
- } else {
- errorHandle(src, true, i > 1, onerror)
- src.pipe(dest)
- }
- src = dest
- }
- if (done) {
- let fin = false
- const autoDestroy = isStreamx(dest) || !!(dest._writableState && dest._writableState.autoDestroy)
- dest.on('error', (err) => {
- if (error === null) error = err
- })
- dest.on('finish', () => {
- fin = true
- if (!autoDestroy) done(error)
- })
- if (autoDestroy) {
- dest.on('close', () => done(error || (fin ? null : PREMATURE_CLOSE)))
- }
- }
- return dest
- function errorHandle (s, rd, wr, onerror) {
- s.on('error', onerror)
- s.on('close', onclose)
- function onclose () {
- if (rd && s._readableState && !s._readableState.ended) return onerror(PREMATURE_CLOSE)
- if (wr && s._writableState && !s._writableState.ended) return onerror(PREMATURE_CLOSE)
- }
- }
- function onerror (err) {
- if (!err || error) return
- error = err
- for (const s of all) {
- s.destroy(err)
- }
- }
- }
- function echo (s) {
- return s
- }
- function isStream (stream) {
- return !!stream._readableState || !!stream._writableState
- }
- function isStreamx (stream) {
- return typeof stream._duplexState === 'number' && isStream(stream)
- }
- function isEnded (stream) {
- return !!stream._readableState && stream._readableState.ended
- }
- function isFinished (stream) {
- return !!stream._writableState && stream._writableState.ended
- }
- function getStreamError (stream, opts = {}) {
- const err = (stream._readableState && stream._readableState.error) || (stream._writableState && stream._writableState.error)
- // avoid implicit errors by default
- return (!opts.all && err === STREAM_DESTROYED) ? null : err
- }
- function isReadStreamx (stream) {
- return isStreamx(stream) && stream.readable
- }
- function isDisturbed (stream) {
- return (stream._duplexState & OPENING) !== OPENING || (stream._duplexState & ACTIVE_OR_TICKING) !== 0
- }
- function isTypedArray (data) {
- return typeof data === 'object' && data !== null && typeof data.byteLength === 'number'
- }
- function defaultByteLength (data) {
- return isTypedArray(data) ? data.byteLength : 1024
- }
- function noop () {}
- function abort () {
- this.destroy(new Error('Stream aborted.'))
- }
- function isWritev (s) {
- return s._writev !== Writable.prototype._writev && s._writev !== Duplex.prototype._writev
- }
- module.exports = {
- pipeline,
- pipelinePromise,
- isStream,
- isStreamx,
- isEnded,
- isFinished,
- isDisturbed,
- getStreamError,
- Stream,
- Writable,
- Readable,
- Duplex,
- Transform,
- // Export PassThrough for compatibility with Node.js core's stream module
- PassThrough
- }
|