| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- 'use strict'
- const { pipeline } = require('./pipeline')
- const Duplex = require('./duplex')
- const { destroyer } = require('./destroy')
- const {
- isNodeStream,
- isReadable,
- isWritable,
- isWebStream,
- isTransformStream,
- isWritableStream,
- isReadableStream
- } = require('./utils')
- const {
- AbortError,
- codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS }
- } = require('../../ours/errors')
- const eos = require('./end-of-stream')
- module.exports = function compose(...streams) {
- if (streams.length === 0) {
- throw new ERR_MISSING_ARGS('streams')
- }
- if (streams.length === 1) {
- return Duplex.from(streams[0])
- }
- const orgStreams = [...streams]
- if (typeof streams[0] === 'function') {
- streams[0] = Duplex.from(streams[0])
- }
- if (typeof streams[streams.length - 1] === 'function') {
- const idx = streams.length - 1
- streams[idx] = Duplex.from(streams[idx])
- }
- for (let n = 0; n < streams.length; ++n) {
- if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
- // TODO(ronag): Add checks for non streams.
- continue
- }
- if (
- n < streams.length - 1 &&
- !(isReadable(streams[n]) || isReadableStream(streams[n]) || isTransformStream(streams[n]))
- ) {
- throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable')
- }
- if (n > 0 && !(isWritable(streams[n]) || isWritableStream(streams[n]) || isTransformStream(streams[n]))) {
- throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable')
- }
- }
- let ondrain
- let onfinish
- let onreadable
- let onclose
- let d
- function onfinished(err) {
- const cb = onclose
- onclose = null
- if (cb) {
- cb(err)
- } else if (err) {
- d.destroy(err)
- } else if (!readable && !writable) {
- d.destroy()
- }
- }
- const head = streams[0]
- const tail = pipeline(streams, onfinished)
- const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head))
- const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail))
- // TODO(ronag): Avoid double buffering.
- // Implement Writable/Readable/Duplex traits.
- // See, https://github.com/nodejs/node/pull/33515.
- d = new Duplex({
- // TODO (ronag): highWaterMark?
- writableObjectMode: !!(head !== null && head !== undefined && head.writableObjectMode),
- readableObjectMode: !!(tail !== null && tail !== undefined && tail.readableObjectMode),
- writable,
- readable
- })
- if (writable) {
- if (isNodeStream(head)) {
- d._write = function (chunk, encoding, callback) {
- if (head.write(chunk, encoding)) {
- callback()
- } else {
- ondrain = callback
- }
- }
- d._final = function (callback) {
- head.end()
- onfinish = callback
- }
- head.on('drain', function () {
- if (ondrain) {
- const cb = ondrain
- ondrain = null
- cb()
- }
- })
- } else if (isWebStream(head)) {
- const writable = isTransformStream(head) ? head.writable : head
- const writer = writable.getWriter()
- d._write = async function (chunk, encoding, callback) {
- try {
- await writer.ready
- writer.write(chunk).catch(() => {})
- callback()
- } catch (err) {
- callback(err)
- }
- }
- d._final = async function (callback) {
- try {
- await writer.ready
- writer.close().catch(() => {})
- onfinish = callback
- } catch (err) {
- callback(err)
- }
- }
- }
- const toRead = isTransformStream(tail) ? tail.readable : tail
- eos(toRead, () => {
- if (onfinish) {
- const cb = onfinish
- onfinish = null
- cb()
- }
- })
- }
- if (readable) {
- if (isNodeStream(tail)) {
- tail.on('readable', function () {
- if (onreadable) {
- const cb = onreadable
- onreadable = null
- cb()
- }
- })
- tail.on('end', function () {
- d.push(null)
- })
- d._read = function () {
- while (true) {
- const buf = tail.read()
- if (buf === null) {
- onreadable = d._read
- return
- }
- if (!d.push(buf)) {
- return
- }
- }
- }
- } else if (isWebStream(tail)) {
- const readable = isTransformStream(tail) ? tail.readable : tail
- const reader = readable.getReader()
- d._read = async function () {
- while (true) {
- try {
- const { value, done } = await reader.read()
- if (!d.push(value)) {
- return
- }
- if (done) {
- d.push(null)
- return
- }
- } catch {
- return
- }
- }
- }
- }
- }
- d._destroy = function (err, callback) {
- if (!err && onclose !== null) {
- err = new AbortError()
- }
- onreadable = null
- ondrain = null
- onfinish = null
- if (onclose === null) {
- callback(err)
- } else {
- onclose = callback
- if (isNodeStream(tail)) {
- destroyer(tail, err)
- }
- }
- }
- return d
- }
|