compose.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. 'use strict'
  2. const { pipeline } = require('./pipeline')
  3. const Duplex = require('./duplex')
  4. const { destroyer } = require('./destroy')
  5. const {
  6. isNodeStream,
  7. isReadable,
  8. isWritable,
  9. isWebStream,
  10. isTransformStream,
  11. isWritableStream,
  12. isReadableStream
  13. } = require('./utils')
  14. const {
  15. AbortError,
  16. codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS }
  17. } = require('../../ours/errors')
  18. const eos = require('./end-of-stream')
  19. module.exports = function compose(...streams) {
  20. if (streams.length === 0) {
  21. throw new ERR_MISSING_ARGS('streams')
  22. }
  23. if (streams.length === 1) {
  24. return Duplex.from(streams[0])
  25. }
  26. const orgStreams = [...streams]
  27. if (typeof streams[0] === 'function') {
  28. streams[0] = Duplex.from(streams[0])
  29. }
  30. if (typeof streams[streams.length - 1] === 'function') {
  31. const idx = streams.length - 1
  32. streams[idx] = Duplex.from(streams[idx])
  33. }
  34. for (let n = 0; n < streams.length; ++n) {
  35. if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
  36. // TODO(ronag): Add checks for non streams.
  37. continue
  38. }
  39. if (
  40. n < streams.length - 1 &&
  41. !(isReadable(streams[n]) || isReadableStream(streams[n]) || isTransformStream(streams[n]))
  42. ) {
  43. throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable')
  44. }
  45. if (n > 0 && !(isWritable(streams[n]) || isWritableStream(streams[n]) || isTransformStream(streams[n]))) {
  46. throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable')
  47. }
  48. }
  49. let ondrain
  50. let onfinish
  51. let onreadable
  52. let onclose
  53. let d
  54. function onfinished(err) {
  55. const cb = onclose
  56. onclose = null
  57. if (cb) {
  58. cb(err)
  59. } else if (err) {
  60. d.destroy(err)
  61. } else if (!readable && !writable) {
  62. d.destroy()
  63. }
  64. }
  65. const head = streams[0]
  66. const tail = pipeline(streams, onfinished)
  67. const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head))
  68. const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail))
  69. // TODO(ronag): Avoid double buffering.
  70. // Implement Writable/Readable/Duplex traits.
  71. // See, https://github.com/nodejs/node/pull/33515.
  72. d = new Duplex({
  73. // TODO (ronag): highWaterMark?
  74. writableObjectMode: !!(head !== null && head !== undefined && head.writableObjectMode),
  75. readableObjectMode: !!(tail !== null && tail !== undefined && tail.readableObjectMode),
  76. writable,
  77. readable
  78. })
  79. if (writable) {
  80. if (isNodeStream(head)) {
  81. d._write = function (chunk, encoding, callback) {
  82. if (head.write(chunk, encoding)) {
  83. callback()
  84. } else {
  85. ondrain = callback
  86. }
  87. }
  88. d._final = function (callback) {
  89. head.end()
  90. onfinish = callback
  91. }
  92. head.on('drain', function () {
  93. if (ondrain) {
  94. const cb = ondrain
  95. ondrain = null
  96. cb()
  97. }
  98. })
  99. } else if (isWebStream(head)) {
  100. const writable = isTransformStream(head) ? head.writable : head
  101. const writer = writable.getWriter()
  102. d._write = async function (chunk, encoding, callback) {
  103. try {
  104. await writer.ready
  105. writer.write(chunk).catch(() => {})
  106. callback()
  107. } catch (err) {
  108. callback(err)
  109. }
  110. }
  111. d._final = async function (callback) {
  112. try {
  113. await writer.ready
  114. writer.close().catch(() => {})
  115. onfinish = callback
  116. } catch (err) {
  117. callback(err)
  118. }
  119. }
  120. }
  121. const toRead = isTransformStream(tail) ? tail.readable : tail
  122. eos(toRead, () => {
  123. if (onfinish) {
  124. const cb = onfinish
  125. onfinish = null
  126. cb()
  127. }
  128. })
  129. }
  130. if (readable) {
  131. if (isNodeStream(tail)) {
  132. tail.on('readable', function () {
  133. if (onreadable) {
  134. const cb = onreadable
  135. onreadable = null
  136. cb()
  137. }
  138. })
  139. tail.on('end', function () {
  140. d.push(null)
  141. })
  142. d._read = function () {
  143. while (true) {
  144. const buf = tail.read()
  145. if (buf === null) {
  146. onreadable = d._read
  147. return
  148. }
  149. if (!d.push(buf)) {
  150. return
  151. }
  152. }
  153. }
  154. } else if (isWebStream(tail)) {
  155. const readable = isTransformStream(tail) ? tail.readable : tail
  156. const reader = readable.getReader()
  157. d._read = async function () {
  158. while (true) {
  159. try {
  160. const { value, done } = await reader.read()
  161. if (!d.push(value)) {
  162. return
  163. }
  164. if (done) {
  165. d.push(null)
  166. return
  167. }
  168. } catch {
  169. return
  170. }
  171. }
  172. }
  173. }
  174. }
  175. d._destroy = function (err, callback) {
  176. if (!err && onclose !== null) {
  177. err = new AbortError()
  178. }
  179. onreadable = null
  180. ondrain = null
  181. onfinish = null
  182. if (onclose === null) {
  183. callback(err)
  184. } else {
  185. onclose = callback
  186. if (isNodeStream(tail)) {
  187. destroyer(tail, err)
  188. }
  189. }
  190. }
  191. return d
  192. }