index.js 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184
  1. const { EventEmitter } = require('events-universal')
  2. const STREAM_DESTROYED = new Error('Stream was destroyed')
  3. const PREMATURE_CLOSE = new Error('Premature close')
  4. const FIFO = require('fast-fifo')
  5. const TextDecoder = require('text-decoder')
  6. // if we do a future major, expect queue microtask to be there always, for now a bit defensive
  7. const qmt = typeof queueMicrotask === 'undefined' ? fn => global.process.nextTick(fn) : queueMicrotask
  8. /* eslint-disable no-multi-spaces */
  9. // 29 bits used total (4 from shared, 14 from read, and 11 from write)
  10. const MAX = ((1 << 29) - 1)
  11. // Shared state
  12. const OPENING = 0b0001
  13. const PREDESTROYING = 0b0010
  14. const DESTROYING = 0b0100
  15. const DESTROYED = 0b1000
  16. const NOT_OPENING = MAX ^ OPENING
  17. const NOT_PREDESTROYING = MAX ^ PREDESTROYING
  18. // Read state (4 bit offset from shared state)
  19. const READ_ACTIVE = 0b00000000000001 << 4
  20. const READ_UPDATING = 0b00000000000010 << 4
  21. const READ_PRIMARY = 0b00000000000100 << 4
  22. const READ_QUEUED = 0b00000000001000 << 4
  23. const READ_RESUMED = 0b00000000010000 << 4
  24. const READ_PIPE_DRAINED = 0b00000000100000 << 4
  25. const READ_ENDING = 0b00000001000000 << 4
  26. const READ_EMIT_DATA = 0b00000010000000 << 4
  27. const READ_EMIT_READABLE = 0b00000100000000 << 4
  28. const READ_EMITTED_READABLE = 0b00001000000000 << 4
  29. const READ_DONE = 0b00010000000000 << 4
  30. const READ_NEXT_TICK = 0b00100000000000 << 4
  31. const READ_NEEDS_PUSH = 0b01000000000000 << 4
  32. const READ_READ_AHEAD = 0b10000000000000 << 4
  33. // Combined read state
  34. const READ_FLOWING = READ_RESUMED | READ_PIPE_DRAINED
  35. const READ_ACTIVE_AND_NEEDS_PUSH = READ_ACTIVE | READ_NEEDS_PUSH
  36. const READ_PRIMARY_AND_ACTIVE = READ_PRIMARY | READ_ACTIVE
  37. const READ_EMIT_READABLE_AND_QUEUED = READ_EMIT_READABLE | READ_QUEUED
  38. const READ_RESUMED_READ_AHEAD = READ_RESUMED | READ_READ_AHEAD
  39. const READ_NOT_ACTIVE = MAX ^ READ_ACTIVE
  40. const READ_NON_PRIMARY = MAX ^ READ_PRIMARY
  41. const READ_NON_PRIMARY_AND_PUSHED = MAX ^ (READ_PRIMARY | READ_NEEDS_PUSH)
  42. const READ_PUSHED = MAX ^ READ_NEEDS_PUSH
  43. const READ_PAUSED = MAX ^ READ_RESUMED
  44. const READ_NOT_QUEUED = MAX ^ (READ_QUEUED | READ_EMITTED_READABLE)
  45. const READ_NOT_ENDING = MAX ^ READ_ENDING
  46. const READ_PIPE_NOT_DRAINED = MAX ^ READ_FLOWING
  47. const READ_NOT_NEXT_TICK = MAX ^ READ_NEXT_TICK
  48. const READ_NOT_UPDATING = MAX ^ READ_UPDATING
  49. const READ_NO_READ_AHEAD = MAX ^ READ_READ_AHEAD
  50. const READ_PAUSED_NO_READ_AHEAD = MAX ^ READ_RESUMED_READ_AHEAD
  51. // Write state (18 bit offset, 4 bit offset from shared state and 14 from read state)
  52. const WRITE_ACTIVE = 0b00000000001 << 18
  53. const WRITE_UPDATING = 0b00000000010 << 18
  54. const WRITE_PRIMARY = 0b00000000100 << 18
  55. const WRITE_QUEUED = 0b00000001000 << 18
  56. const WRITE_UNDRAINED = 0b00000010000 << 18
  57. const WRITE_DONE = 0b00000100000 << 18
  58. const WRITE_EMIT_DRAIN = 0b00001000000 << 18
  59. const WRITE_NEXT_TICK = 0b00010000000 << 18
  60. const WRITE_WRITING = 0b00100000000 << 18
  61. const WRITE_FINISHING = 0b01000000000 << 18
  62. const WRITE_CORKED = 0b10000000000 << 18
  63. const WRITE_NOT_ACTIVE = MAX ^ (WRITE_ACTIVE | WRITE_WRITING)
  64. const WRITE_NON_PRIMARY = MAX ^ WRITE_PRIMARY
  65. const WRITE_NOT_FINISHING = MAX ^ (WRITE_ACTIVE | WRITE_FINISHING)
  66. const WRITE_DRAINED = MAX ^ WRITE_UNDRAINED
  67. const WRITE_NOT_QUEUED = MAX ^ WRITE_QUEUED
  68. const WRITE_NOT_NEXT_TICK = MAX ^ WRITE_NEXT_TICK
  69. const WRITE_NOT_UPDATING = MAX ^ WRITE_UPDATING
  70. const WRITE_NOT_CORKED = MAX ^ WRITE_CORKED
  71. // Combined shared state
  72. const ACTIVE = READ_ACTIVE | WRITE_ACTIVE
  73. const NOT_ACTIVE = MAX ^ ACTIVE
  74. const DONE = READ_DONE | WRITE_DONE
  75. const DESTROY_STATUS = DESTROYING | DESTROYED | PREDESTROYING
  76. const OPEN_STATUS = DESTROY_STATUS | OPENING
  77. const AUTO_DESTROY = DESTROY_STATUS | DONE
  78. const NON_PRIMARY = WRITE_NON_PRIMARY & READ_NON_PRIMARY
  79. const ACTIVE_OR_TICKING = WRITE_NEXT_TICK | READ_NEXT_TICK
  80. const TICKING = ACTIVE_OR_TICKING & NOT_ACTIVE
  81. const IS_OPENING = OPEN_STATUS | TICKING
  82. // Combined shared state and read state
  83. const READ_PRIMARY_STATUS = OPEN_STATUS | READ_ENDING | READ_DONE
  84. const READ_STATUS = OPEN_STATUS | READ_DONE | READ_QUEUED
  85. const READ_ENDING_STATUS = OPEN_STATUS | READ_ENDING | READ_QUEUED
  86. const READ_READABLE_STATUS = OPEN_STATUS | READ_EMIT_READABLE | READ_QUEUED | READ_EMITTED_READABLE
  87. const SHOULD_NOT_READ = OPEN_STATUS | READ_ACTIVE | READ_ENDING | READ_DONE | READ_NEEDS_PUSH | READ_READ_AHEAD
  88. const READ_BACKPRESSURE_STATUS = DESTROY_STATUS | READ_ENDING | READ_DONE
  89. const READ_UPDATE_SYNC_STATUS = READ_UPDATING | OPEN_STATUS | READ_NEXT_TICK | READ_PRIMARY
  90. const READ_NEXT_TICK_OR_OPENING = READ_NEXT_TICK | OPENING
  91. // Combined write state
  92. const WRITE_PRIMARY_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_DONE
  93. const WRITE_QUEUED_AND_UNDRAINED = WRITE_QUEUED | WRITE_UNDRAINED
  94. const WRITE_QUEUED_AND_ACTIVE = WRITE_QUEUED | WRITE_ACTIVE
  95. const WRITE_DRAIN_STATUS = WRITE_QUEUED | WRITE_UNDRAINED | OPEN_STATUS | WRITE_ACTIVE
  96. const WRITE_STATUS = OPEN_STATUS | WRITE_ACTIVE | WRITE_QUEUED | WRITE_CORKED
  97. const WRITE_PRIMARY_AND_ACTIVE = WRITE_PRIMARY | WRITE_ACTIVE
  98. const WRITE_ACTIVE_AND_WRITING = WRITE_ACTIVE | WRITE_WRITING
  99. const WRITE_FINISHING_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_QUEUED_AND_ACTIVE | WRITE_DONE
  100. const WRITE_BACKPRESSURE_STATUS = WRITE_UNDRAINED | DESTROY_STATUS | WRITE_FINISHING | WRITE_DONE
  101. const WRITE_UPDATE_SYNC_STATUS = WRITE_UPDATING | OPEN_STATUS | WRITE_NEXT_TICK | WRITE_PRIMARY
  102. const WRITE_DROP_DATA = WRITE_FINISHING | WRITE_DONE | DESTROY_STATUS
  103. const asyncIterator = Symbol.asyncIterator || Symbol('asyncIterator')
  104. class WritableState {
  105. constructor (stream, { highWaterMark = 16384, map = null, mapWritable, byteLength, byteLengthWritable } = {}) {
  106. this.stream = stream
  107. this.queue = new FIFO()
  108. this.highWaterMark = highWaterMark
  109. this.buffered = 0
  110. this.error = null
  111. this.pipeline = null
  112. this.drains = null // if we add more seldomly used helpers we might them into a subobject so its a single ptr
  113. this.byteLength = byteLengthWritable || byteLength || defaultByteLength
  114. this.map = mapWritable || map
  115. this.afterWrite = afterWrite.bind(this)
  116. this.afterUpdateNextTick = updateWriteNT.bind(this)
  117. }
  118. get ended () {
  119. return (this.stream._duplexState & WRITE_DONE) !== 0
  120. }
  121. push (data) {
  122. if ((this.stream._duplexState & WRITE_DROP_DATA) !== 0) return false
  123. if (this.map !== null) data = this.map(data)
  124. this.buffered += this.byteLength(data)
  125. this.queue.push(data)
  126. if (this.buffered < this.highWaterMark) {
  127. this.stream._duplexState |= WRITE_QUEUED
  128. return true
  129. }
  130. this.stream._duplexState |= WRITE_QUEUED_AND_UNDRAINED
  131. return false
  132. }
  133. shift () {
  134. const data = this.queue.shift()
  135. this.buffered -= this.byteLength(data)
  136. if (this.buffered === 0) this.stream._duplexState &= WRITE_NOT_QUEUED
  137. return data
  138. }
  139. end (data) {
  140. if (typeof data === 'function') this.stream.once('finish', data)
  141. else if (data !== undefined && data !== null) this.push(data)
  142. this.stream._duplexState = (this.stream._duplexState | WRITE_FINISHING) & WRITE_NON_PRIMARY
  143. }
  144. autoBatch (data, cb) {
  145. const buffer = []
  146. const stream = this.stream
  147. buffer.push(data)
  148. while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED_AND_ACTIVE) {
  149. buffer.push(stream._writableState.shift())
  150. }
  151. if ((stream._duplexState & OPEN_STATUS) !== 0) return cb(null)
  152. stream._writev(buffer, cb)
  153. }
  154. update () {
  155. const stream = this.stream
  156. stream._duplexState |= WRITE_UPDATING
  157. do {
  158. while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED) {
  159. const data = this.shift()
  160. stream._duplexState |= WRITE_ACTIVE_AND_WRITING
  161. stream._write(data, this.afterWrite)
  162. }
  163. if ((stream._duplexState & WRITE_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
  164. } while (this.continueUpdate() === true)
  165. stream._duplexState &= WRITE_NOT_UPDATING
  166. }
  167. updateNonPrimary () {
  168. const stream = this.stream
  169. if ((stream._duplexState & WRITE_FINISHING_STATUS) === WRITE_FINISHING) {
  170. stream._duplexState = stream._duplexState | WRITE_ACTIVE
  171. stream._final(afterFinal.bind(this))
  172. return
  173. }
  174. if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
  175. if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
  176. stream._duplexState |= ACTIVE
  177. stream._destroy(afterDestroy.bind(this))
  178. }
  179. return
  180. }
  181. if ((stream._duplexState & IS_OPENING) === OPENING) {
  182. stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
  183. stream._open(afterOpen.bind(this))
  184. }
  185. }
  186. continueUpdate () {
  187. if ((this.stream._duplexState & WRITE_NEXT_TICK) === 0) return false
  188. this.stream._duplexState &= WRITE_NOT_NEXT_TICK
  189. return true
  190. }
  191. updateCallback () {
  192. if ((this.stream._duplexState & WRITE_UPDATE_SYNC_STATUS) === WRITE_PRIMARY) this.update()
  193. else this.updateNextTick()
  194. }
  195. updateNextTick () {
  196. if ((this.stream._duplexState & WRITE_NEXT_TICK) !== 0) return
  197. this.stream._duplexState |= WRITE_NEXT_TICK
  198. if ((this.stream._duplexState & WRITE_UPDATING) === 0) qmt(this.afterUpdateNextTick)
  199. }
  200. }
  201. class ReadableState {
  202. constructor (stream, { highWaterMark = 16384, map = null, mapReadable, byteLength, byteLengthReadable } = {}) {
  203. this.stream = stream
  204. this.queue = new FIFO()
  205. this.highWaterMark = highWaterMark === 0 ? 1 : highWaterMark
  206. this.buffered = 0
  207. this.readAhead = highWaterMark > 0
  208. this.error = null
  209. this.pipeline = null
  210. this.byteLength = byteLengthReadable || byteLength || defaultByteLength
  211. this.map = mapReadable || map
  212. this.pipeTo = null
  213. this.afterRead = afterRead.bind(this)
  214. this.afterUpdateNextTick = updateReadNT.bind(this)
  215. }
  216. get ended () {
  217. return (this.stream._duplexState & READ_DONE) !== 0
  218. }
  219. pipe (pipeTo, cb) {
  220. if (this.pipeTo !== null) throw new Error('Can only pipe to one destination')
  221. if (typeof cb !== 'function') cb = null
  222. this.stream._duplexState |= READ_PIPE_DRAINED
  223. this.pipeTo = pipeTo
  224. this.pipeline = new Pipeline(this.stream, pipeTo, cb)
  225. if (cb) this.stream.on('error', noop) // We already error handle this so supress crashes
  226. if (isStreamx(pipeTo)) {
  227. pipeTo._writableState.pipeline = this.pipeline
  228. if (cb) pipeTo.on('error', noop) // We already error handle this so supress crashes
  229. pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline)) // TODO: just call finished from pipeTo itself
  230. } else {
  231. const onerror = this.pipeline.done.bind(this.pipeline, pipeTo)
  232. const onclose = this.pipeline.done.bind(this.pipeline, pipeTo, null) // onclose has a weird bool arg
  233. pipeTo.on('error', onerror)
  234. pipeTo.on('close', onclose)
  235. pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline))
  236. }
  237. pipeTo.on('drain', afterDrain.bind(this))
  238. this.stream.emit('piping', pipeTo)
  239. pipeTo.emit('pipe', this.stream)
  240. }
  241. push (data) {
  242. const stream = this.stream
  243. if (data === null) {
  244. this.highWaterMark = 0
  245. stream._duplexState = (stream._duplexState | READ_ENDING) & READ_NON_PRIMARY_AND_PUSHED
  246. return false
  247. }
  248. if (this.map !== null) {
  249. data = this.map(data)
  250. if (data === null) {
  251. stream._duplexState &= READ_PUSHED
  252. return this.buffered < this.highWaterMark
  253. }
  254. }
  255. this.buffered += this.byteLength(data)
  256. this.queue.push(data)
  257. stream._duplexState = (stream._duplexState | READ_QUEUED) & READ_PUSHED
  258. return this.buffered < this.highWaterMark
  259. }
  260. shift () {
  261. const data = this.queue.shift()
  262. this.buffered -= this.byteLength(data)
  263. if (this.buffered === 0) this.stream._duplexState &= READ_NOT_QUEUED
  264. return data
  265. }
  266. unshift (data) {
  267. const pending = [this.map !== null ? this.map(data) : data]
  268. while (this.buffered > 0) pending.push(this.shift())
  269. for (let i = 0; i < pending.length - 1; i++) {
  270. const data = pending[i]
  271. this.buffered += this.byteLength(data)
  272. this.queue.push(data)
  273. }
  274. this.push(pending[pending.length - 1])
  275. }
  276. read () {
  277. const stream = this.stream
  278. if ((stream._duplexState & READ_STATUS) === READ_QUEUED) {
  279. const data = this.shift()
  280. if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
  281. if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
  282. return data
  283. }
  284. if (this.readAhead === false) {
  285. stream._duplexState |= READ_READ_AHEAD
  286. this.updateNextTick()
  287. }
  288. return null
  289. }
  290. drain () {
  291. const stream = this.stream
  292. while ((stream._duplexState & READ_STATUS) === READ_QUEUED && (stream._duplexState & READ_FLOWING) !== 0) {
  293. const data = this.shift()
  294. if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
  295. if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
  296. }
  297. }
  298. update () {
  299. const stream = this.stream
  300. stream._duplexState |= READ_UPDATING
  301. do {
  302. this.drain()
  303. while (this.buffered < this.highWaterMark && (stream._duplexState & SHOULD_NOT_READ) === READ_READ_AHEAD) {
  304. stream._duplexState |= READ_ACTIVE_AND_NEEDS_PUSH
  305. stream._read(this.afterRead)
  306. this.drain()
  307. }
  308. if ((stream._duplexState & READ_READABLE_STATUS) === READ_EMIT_READABLE_AND_QUEUED) {
  309. stream._duplexState |= READ_EMITTED_READABLE
  310. stream.emit('readable')
  311. }
  312. if ((stream._duplexState & READ_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
  313. } while (this.continueUpdate() === true)
  314. stream._duplexState &= READ_NOT_UPDATING
  315. }
  316. updateNonPrimary () {
  317. const stream = this.stream
  318. if ((stream._duplexState & READ_ENDING_STATUS) === READ_ENDING) {
  319. stream._duplexState = (stream._duplexState | READ_DONE) & READ_NOT_ENDING
  320. stream.emit('end')
  321. if ((stream._duplexState & AUTO_DESTROY) === DONE) stream._duplexState |= DESTROYING
  322. if (this.pipeTo !== null) this.pipeTo.end()
  323. }
  324. if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
  325. if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
  326. stream._duplexState |= ACTIVE
  327. stream._destroy(afterDestroy.bind(this))
  328. }
  329. return
  330. }
  331. if ((stream._duplexState & IS_OPENING) === OPENING) {
  332. stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
  333. stream._open(afterOpen.bind(this))
  334. }
  335. }
  336. continueUpdate () {
  337. if ((this.stream._duplexState & READ_NEXT_TICK) === 0) return false
  338. this.stream._duplexState &= READ_NOT_NEXT_TICK
  339. return true
  340. }
  341. updateCallback () {
  342. if ((this.stream._duplexState & READ_UPDATE_SYNC_STATUS) === READ_PRIMARY) this.update()
  343. else this.updateNextTick()
  344. }
  345. updateNextTickIfOpen () {
  346. if ((this.stream._duplexState & READ_NEXT_TICK_OR_OPENING) !== 0) return
  347. this.stream._duplexState |= READ_NEXT_TICK
  348. if ((this.stream._duplexState & READ_UPDATING) === 0) qmt(this.afterUpdateNextTick)
  349. }
  350. updateNextTick () {
  351. if ((this.stream._duplexState & READ_NEXT_TICK) !== 0) return
  352. this.stream._duplexState |= READ_NEXT_TICK
  353. if ((this.stream._duplexState & READ_UPDATING) === 0) qmt(this.afterUpdateNextTick)
  354. }
  355. }
  356. class TransformState {
  357. constructor (stream) {
  358. this.data = null
  359. this.afterTransform = afterTransform.bind(stream)
  360. this.afterFinal = null
  361. }
  362. }
  363. class Pipeline {
  364. constructor (src, dst, cb) {
  365. this.from = src
  366. this.to = dst
  367. this.afterPipe = cb
  368. this.error = null
  369. this.pipeToFinished = false
  370. }
  371. finished () {
  372. this.pipeToFinished = true
  373. }
  374. done (stream, err) {
  375. if (err) this.error = err
  376. if (stream === this.to) {
  377. this.to = null
  378. if (this.from !== null) {
  379. if ((this.from._duplexState & READ_DONE) === 0 || !this.pipeToFinished) {
  380. this.from.destroy(this.error || new Error('Writable stream closed prematurely'))
  381. }
  382. return
  383. }
  384. }
  385. if (stream === this.from) {
  386. this.from = null
  387. if (this.to !== null) {
  388. if ((stream._duplexState & READ_DONE) === 0) {
  389. this.to.destroy(this.error || new Error('Readable stream closed before ending'))
  390. }
  391. return
  392. }
  393. }
  394. if (this.afterPipe !== null) this.afterPipe(this.error)
  395. this.to = this.from = this.afterPipe = null
  396. }
  397. }
  398. function afterDrain () {
  399. this.stream._duplexState |= READ_PIPE_DRAINED
  400. this.updateCallback()
  401. }
  402. function afterFinal (err) {
  403. const stream = this.stream
  404. if (err) stream.destroy(err)
  405. if ((stream._duplexState & DESTROY_STATUS) === 0) {
  406. stream._duplexState |= WRITE_DONE
  407. stream.emit('finish')
  408. }
  409. if ((stream._duplexState & AUTO_DESTROY) === DONE) {
  410. stream._duplexState |= DESTROYING
  411. }
  412. stream._duplexState &= WRITE_NOT_FINISHING
  413. // no need to wait the extra tick here, so we short circuit that
  414. if ((stream._duplexState & WRITE_UPDATING) === 0) this.update()
  415. else this.updateNextTick()
  416. }
  417. function afterDestroy (err) {
  418. const stream = this.stream
  419. if (!err && this.error !== STREAM_DESTROYED) err = this.error
  420. if (err) stream.emit('error', err)
  421. stream._duplexState |= DESTROYED
  422. stream.emit('close')
  423. const rs = stream._readableState
  424. const ws = stream._writableState
  425. if (rs !== null && rs.pipeline !== null) rs.pipeline.done(stream, err)
  426. if (ws !== null) {
  427. while (ws.drains !== null && ws.drains.length > 0) ws.drains.shift().resolve(false)
  428. if (ws.pipeline !== null) ws.pipeline.done(stream, err)
  429. }
  430. }
  431. function afterWrite (err) {
  432. const stream = this.stream
  433. if (err) stream.destroy(err)
  434. stream._duplexState &= WRITE_NOT_ACTIVE
  435. if (this.drains !== null) tickDrains(this.drains)
  436. if ((stream._duplexState & WRITE_DRAIN_STATUS) === WRITE_UNDRAINED) {
  437. stream._duplexState &= WRITE_DRAINED
  438. if ((stream._duplexState & WRITE_EMIT_DRAIN) === WRITE_EMIT_DRAIN) {
  439. stream.emit('drain')
  440. }
  441. }
  442. this.updateCallback()
  443. }
  444. function afterRead (err) {
  445. if (err) this.stream.destroy(err)
  446. this.stream._duplexState &= READ_NOT_ACTIVE
  447. if (this.readAhead === false && (this.stream._duplexState & READ_RESUMED) === 0) this.stream._duplexState &= READ_NO_READ_AHEAD
  448. this.updateCallback()
  449. }
  450. function updateReadNT () {
  451. if ((this.stream._duplexState & READ_UPDATING) === 0) {
  452. this.stream._duplexState &= READ_NOT_NEXT_TICK
  453. this.update()
  454. }
  455. }
  456. function updateWriteNT () {
  457. if ((this.stream._duplexState & WRITE_UPDATING) === 0) {
  458. this.stream._duplexState &= WRITE_NOT_NEXT_TICK
  459. this.update()
  460. }
  461. }
  462. function tickDrains (drains) {
  463. for (let i = 0; i < drains.length; i++) {
  464. // drains.writes are monotonic, so if one is 0 its always the first one
  465. if (--drains[i].writes === 0) {
  466. drains.shift().resolve(true)
  467. i--
  468. }
  469. }
  470. }
  471. function afterOpen (err) {
  472. const stream = this.stream
  473. if (err) stream.destroy(err)
  474. if ((stream._duplexState & DESTROYING) === 0) {
  475. if ((stream._duplexState & READ_PRIMARY_STATUS) === 0) stream._duplexState |= READ_PRIMARY
  476. if ((stream._duplexState & WRITE_PRIMARY_STATUS) === 0) stream._duplexState |= WRITE_PRIMARY
  477. stream.emit('open')
  478. }
  479. stream._duplexState &= NOT_ACTIVE
  480. if (stream._writableState !== null) {
  481. stream._writableState.updateCallback()
  482. }
  483. if (stream._readableState !== null) {
  484. stream._readableState.updateCallback()
  485. }
  486. }
  487. function afterTransform (err, data) {
  488. if (data !== undefined && data !== null) this.push(data)
  489. this._writableState.afterWrite(err)
  490. }
  491. function newListener (name) {
  492. if (this._readableState !== null) {
  493. if (name === 'data') {
  494. this._duplexState |= (READ_EMIT_DATA | READ_RESUMED_READ_AHEAD)
  495. this._readableState.updateNextTick()
  496. }
  497. if (name === 'readable') {
  498. this._duplexState |= READ_EMIT_READABLE
  499. this._readableState.updateNextTick()
  500. }
  501. }
  502. if (this._writableState !== null) {
  503. if (name === 'drain') {
  504. this._duplexState |= WRITE_EMIT_DRAIN
  505. this._writableState.updateNextTick()
  506. }
  507. }
  508. }
  509. class Stream extends EventEmitter {
  510. constructor (opts) {
  511. super()
  512. this._duplexState = 0
  513. this._readableState = null
  514. this._writableState = null
  515. if (opts) {
  516. if (opts.open) this._open = opts.open
  517. if (opts.destroy) this._destroy = opts.destroy
  518. if (opts.predestroy) this._predestroy = opts.predestroy
  519. if (opts.signal) {
  520. opts.signal.addEventListener('abort', abort.bind(this))
  521. }
  522. }
  523. this.on('newListener', newListener)
  524. }
  525. _open (cb) {
  526. cb(null)
  527. }
  528. _destroy (cb) {
  529. cb(null)
  530. }
  531. _predestroy () {
  532. // does nothing
  533. }
  534. get readable () {
  535. return this._readableState !== null ? true : undefined
  536. }
  537. get writable () {
  538. return this._writableState !== null ? true : undefined
  539. }
  540. get destroyed () {
  541. return (this._duplexState & DESTROYED) !== 0
  542. }
  543. get destroying () {
  544. return (this._duplexState & DESTROY_STATUS) !== 0
  545. }
  546. destroy (err) {
  547. if ((this._duplexState & DESTROY_STATUS) === 0) {
  548. if (!err) err = STREAM_DESTROYED
  549. this._duplexState = (this._duplexState | DESTROYING) & NON_PRIMARY
  550. if (this._readableState !== null) {
  551. this._readableState.highWaterMark = 0
  552. this._readableState.error = err
  553. }
  554. if (this._writableState !== null) {
  555. this._writableState.highWaterMark = 0
  556. this._writableState.error = err
  557. }
  558. this._duplexState |= PREDESTROYING
  559. this._predestroy()
  560. this._duplexState &= NOT_PREDESTROYING
  561. if (this._readableState !== null) this._readableState.updateNextTick()
  562. if (this._writableState !== null) this._writableState.updateNextTick()
  563. }
  564. }
  565. }
  566. class Readable extends Stream {
  567. constructor (opts) {
  568. super(opts)
  569. this._duplexState |= OPENING | WRITE_DONE | READ_READ_AHEAD
  570. this._readableState = new ReadableState(this, opts)
  571. if (opts) {
  572. if (this._readableState.readAhead === false) this._duplexState &= READ_NO_READ_AHEAD
  573. if (opts.read) this._read = opts.read
  574. if (opts.eagerOpen) this._readableState.updateNextTick()
  575. if (opts.encoding) this.setEncoding(opts.encoding)
  576. }
  577. }
  578. setEncoding (encoding) {
  579. const dec = new TextDecoder(encoding)
  580. const map = this._readableState.map || echo
  581. this._readableState.map = mapOrSkip
  582. return this
  583. function mapOrSkip (data) {
  584. const next = dec.push(data)
  585. return next === '' && (data.byteLength !== 0 || dec.remaining > 0) ? null : map(next)
  586. }
  587. }
  588. _read (cb) {
  589. cb(null)
  590. }
  591. pipe (dest, cb) {
  592. this._readableState.updateNextTick()
  593. this._readableState.pipe(dest, cb)
  594. return dest
  595. }
  596. read () {
  597. this._readableState.updateNextTick()
  598. return this._readableState.read()
  599. }
  600. push (data) {
  601. this._readableState.updateNextTickIfOpen()
  602. return this._readableState.push(data)
  603. }
  604. unshift (data) {
  605. this._readableState.updateNextTickIfOpen()
  606. return this._readableState.unshift(data)
  607. }
  608. resume () {
  609. this._duplexState |= READ_RESUMED_READ_AHEAD
  610. this._readableState.updateNextTick()
  611. return this
  612. }
  613. pause () {
  614. this._duplexState &= (this._readableState.readAhead === false ? READ_PAUSED_NO_READ_AHEAD : READ_PAUSED)
  615. return this
  616. }
  617. static _fromAsyncIterator (ite, opts) {
  618. let destroy
  619. const rs = new Readable({
  620. ...opts,
  621. read (cb) {
  622. ite.next().then(push).then(cb.bind(null, null)).catch(cb)
  623. },
  624. predestroy () {
  625. destroy = ite.return()
  626. },
  627. destroy (cb) {
  628. if (!destroy) return cb(null)
  629. destroy.then(cb.bind(null, null)).catch(cb)
  630. }
  631. })
  632. return rs
  633. function push (data) {
  634. if (data.done) rs.push(null)
  635. else rs.push(data.value)
  636. }
  637. }
  638. static from (data, opts) {
  639. if (isReadStreamx(data)) return data
  640. if (data[asyncIterator]) return this._fromAsyncIterator(data[asyncIterator](), opts)
  641. if (!Array.isArray(data)) data = data === undefined ? [] : [data]
  642. let i = 0
  643. return new Readable({
  644. ...opts,
  645. read (cb) {
  646. this.push(i === data.length ? null : data[i++])
  647. cb(null)
  648. }
  649. })
  650. }
  651. static isBackpressured (rs) {
  652. return (rs._duplexState & READ_BACKPRESSURE_STATUS) !== 0 || rs._readableState.buffered >= rs._readableState.highWaterMark
  653. }
  654. static isPaused (rs) {
  655. return (rs._duplexState & READ_RESUMED) === 0
  656. }
  657. [asyncIterator] () {
  658. const stream = this
  659. let error = null
  660. let promiseResolve = null
  661. let promiseReject = null
  662. this.on('error', (err) => { error = err })
  663. this.on('readable', onreadable)
  664. this.on('close', onclose)
  665. return {
  666. [asyncIterator] () {
  667. return this
  668. },
  669. next () {
  670. return new Promise(function (resolve, reject) {
  671. promiseResolve = resolve
  672. promiseReject = reject
  673. const data = stream.read()
  674. if (data !== null) ondata(data)
  675. else if ((stream._duplexState & DESTROYED) !== 0) ondata(null)
  676. })
  677. },
  678. return () {
  679. return destroy(null)
  680. },
  681. throw (err) {
  682. return destroy(err)
  683. }
  684. }
  685. function onreadable () {
  686. if (promiseResolve !== null) ondata(stream.read())
  687. }
  688. function onclose () {
  689. if (promiseResolve !== null) ondata(null)
  690. }
  691. function ondata (data) {
  692. if (promiseReject === null) return
  693. if (error) promiseReject(error)
  694. else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED)
  695. else promiseResolve({ value: data, done: data === null })
  696. promiseReject = promiseResolve = null
  697. }
  698. function destroy (err) {
  699. stream.destroy(err)
  700. return new Promise((resolve, reject) => {
  701. if (stream._duplexState & DESTROYED) return resolve({ value: undefined, done: true })
  702. stream.once('close', function () {
  703. if (err) reject(err)
  704. else resolve({ value: undefined, done: true })
  705. })
  706. })
  707. }
  708. }
  709. }
  710. class Writable extends Stream {
  711. constructor (opts) {
  712. super(opts)
  713. this._duplexState |= OPENING | READ_DONE
  714. this._writableState = new WritableState(this, opts)
  715. if (opts) {
  716. if (opts.writev) this._writev = opts.writev
  717. if (opts.write) this._write = opts.write
  718. if (opts.final) this._final = opts.final
  719. if (opts.eagerOpen) this._writableState.updateNextTick()
  720. }
  721. }
  722. cork () {
  723. this._duplexState |= WRITE_CORKED
  724. }
  725. uncork () {
  726. this._duplexState &= WRITE_NOT_CORKED
  727. this._writableState.updateNextTick()
  728. }
  729. _writev (batch, cb) {
  730. cb(null)
  731. }
  732. _write (data, cb) {
  733. this._writableState.autoBatch(data, cb)
  734. }
  735. _final (cb) {
  736. cb(null)
  737. }
  738. static isBackpressured (ws) {
  739. return (ws._duplexState & WRITE_BACKPRESSURE_STATUS) !== 0
  740. }
  741. static drained (ws) {
  742. if (ws.destroyed) return Promise.resolve(false)
  743. const state = ws._writableState
  744. const pending = (isWritev(ws) ? Math.min(1, state.queue.length) : state.queue.length)
  745. const writes = pending + ((ws._duplexState & WRITE_WRITING) ? 1 : 0)
  746. if (writes === 0) return Promise.resolve(true)
  747. if (state.drains === null) state.drains = []
  748. return new Promise((resolve) => {
  749. state.drains.push({ writes, resolve })
  750. })
  751. }
  752. write (data) {
  753. this._writableState.updateNextTick()
  754. return this._writableState.push(data)
  755. }
  756. end (data) {
  757. this._writableState.updateNextTick()
  758. this._writableState.end(data)
  759. return this
  760. }
  761. }
  762. class Duplex extends Readable { // and Writable
  763. constructor (opts) {
  764. super(opts)
  765. this._duplexState = OPENING | (this._duplexState & READ_READ_AHEAD)
  766. this._writableState = new WritableState(this, opts)
  767. if (opts) {
  768. if (opts.writev) this._writev = opts.writev
  769. if (opts.write) this._write = opts.write
  770. if (opts.final) this._final = opts.final
  771. }
  772. }
  773. cork () {
  774. this._duplexState |= WRITE_CORKED
  775. }
  776. uncork () {
  777. this._duplexState &= WRITE_NOT_CORKED
  778. this._writableState.updateNextTick()
  779. }
  780. _writev (batch, cb) {
  781. cb(null)
  782. }
  783. _write (data, cb) {
  784. this._writableState.autoBatch(data, cb)
  785. }
  786. _final (cb) {
  787. cb(null)
  788. }
  789. write (data) {
  790. this._writableState.updateNextTick()
  791. return this._writableState.push(data)
  792. }
  793. end (data) {
  794. this._writableState.updateNextTick()
  795. this._writableState.end(data)
  796. return this
  797. }
  798. }
  799. class Transform extends Duplex {
  800. constructor (opts) {
  801. super(opts)
  802. this._transformState = new TransformState(this)
  803. if (opts) {
  804. if (opts.transform) this._transform = opts.transform
  805. if (opts.flush) this._flush = opts.flush
  806. }
  807. }
  808. _write (data, cb) {
  809. if (this._readableState.buffered >= this._readableState.highWaterMark) {
  810. this._transformState.data = data
  811. } else {
  812. this._transform(data, this._transformState.afterTransform)
  813. }
  814. }
  815. _read (cb) {
  816. if (this._transformState.data !== null) {
  817. const data = this._transformState.data
  818. this._transformState.data = null
  819. cb(null)
  820. this._transform(data, this._transformState.afterTransform)
  821. } else {
  822. cb(null)
  823. }
  824. }
  825. destroy (err) {
  826. super.destroy(err)
  827. if (this._transformState.data !== null) {
  828. this._transformState.data = null
  829. this._transformState.afterTransform()
  830. }
  831. }
  832. _transform (data, cb) {
  833. cb(null, data)
  834. }
  835. _flush (cb) {
  836. cb(null)
  837. }
  838. _final (cb) {
  839. this._transformState.afterFinal = cb
  840. this._flush(transformAfterFlush.bind(this))
  841. }
  842. }
  843. class PassThrough extends Transform {}
  844. function transformAfterFlush (err, data) {
  845. const cb = this._transformState.afterFinal
  846. if (err) return cb(err)
  847. if (data !== null && data !== undefined) this.push(data)
  848. this.push(null)
  849. cb(null)
  850. }
  851. function pipelinePromise (...streams) {
  852. return new Promise((resolve, reject) => {
  853. return pipeline(...streams, (err) => {
  854. if (err) return reject(err)
  855. resolve()
  856. })
  857. })
  858. }
  859. function pipeline (stream, ...streams) {
  860. const all = Array.isArray(stream) ? [...stream, ...streams] : [stream, ...streams]
  861. const done = (all.length && typeof all[all.length - 1] === 'function') ? all.pop() : null
  862. if (all.length < 2) throw new Error('Pipeline requires at least 2 streams')
  863. let src = all[0]
  864. let dest = null
  865. let error = null
  866. for (let i = 1; i < all.length; i++) {
  867. dest = all[i]
  868. if (isStreamx(src)) {
  869. src.pipe(dest, onerror)
  870. } else {
  871. errorHandle(src, true, i > 1, onerror)
  872. src.pipe(dest)
  873. }
  874. src = dest
  875. }
  876. if (done) {
  877. let fin = false
  878. const autoDestroy = isStreamx(dest) || !!(dest._writableState && dest._writableState.autoDestroy)
  879. dest.on('error', (err) => {
  880. if (error === null) error = err
  881. })
  882. dest.on('finish', () => {
  883. fin = true
  884. if (!autoDestroy) done(error)
  885. })
  886. if (autoDestroy) {
  887. dest.on('close', () => done(error || (fin ? null : PREMATURE_CLOSE)))
  888. }
  889. }
  890. return dest
  891. function errorHandle (s, rd, wr, onerror) {
  892. s.on('error', onerror)
  893. s.on('close', onclose)
  894. function onclose () {
  895. if (rd && s._readableState && !s._readableState.ended) return onerror(PREMATURE_CLOSE)
  896. if (wr && s._writableState && !s._writableState.ended) return onerror(PREMATURE_CLOSE)
  897. }
  898. }
  899. function onerror (err) {
  900. if (!err || error) return
  901. error = err
  902. for (const s of all) {
  903. s.destroy(err)
  904. }
  905. }
  906. }
  907. function echo (s) {
  908. return s
  909. }
  910. function isStream (stream) {
  911. return !!stream._readableState || !!stream._writableState
  912. }
  913. function isStreamx (stream) {
  914. return typeof stream._duplexState === 'number' && isStream(stream)
  915. }
  916. function isEnded (stream) {
  917. return !!stream._readableState && stream._readableState.ended
  918. }
  919. function isFinished (stream) {
  920. return !!stream._writableState && stream._writableState.ended
  921. }
  922. function getStreamError (stream, opts = {}) {
  923. const err = (stream._readableState && stream._readableState.error) || (stream._writableState && stream._writableState.error)
  924. // avoid implicit errors by default
  925. return (!opts.all && err === STREAM_DESTROYED) ? null : err
  926. }
  927. function isReadStreamx (stream) {
  928. return isStreamx(stream) && stream.readable
  929. }
  930. function isDisturbed (stream) {
  931. return (stream._duplexState & OPENING) !== OPENING || (stream._duplexState & ACTIVE_OR_TICKING) !== 0
  932. }
  933. function isTypedArray (data) {
  934. return typeof data === 'object' && data !== null && typeof data.byteLength === 'number'
  935. }
  936. function defaultByteLength (data) {
  937. return isTypedArray(data) ? data.byteLength : 1024
  938. }
  939. function noop () {}
  940. function abort () {
  941. this.destroy(new Error('Stream aborted.'))
  942. }
  943. function isWritev (s) {
  944. return s._writev !== Writable.prototype._writev && s._writev !== Duplex.prototype._writev
  945. }
  946. module.exports = {
  947. pipeline,
  948. pipelinePromise,
  949. isStream,
  950. isStreamx,
  951. isEnded,
  952. isFinished,
  953. isDisturbed,
  954. getStreamError,
  955. Stream,
  956. Writable,
  957. Readable,
  958. Duplex,
  959. Transform,
  960. // Export PassThrough for compatibility with Node.js core's stream module
  961. PassThrough
  962. }