| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- /* replacement start */
- const process = require('process/')
- /* replacement end */
- ;('use strict')
- const bufferModule = require('buffer')
- const {
- isReadable,
- isWritable,
- isIterable,
- isNodeStream,
- isReadableNodeStream,
- isWritableNodeStream,
- isDuplexNodeStream,
- isReadableStream,
- isWritableStream
- } = require('./utils')
- const eos = require('./end-of-stream')
- const {
- AbortError,
- codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE }
- } = require('../../ours/errors')
- const { destroyer } = require('./destroy')
- const Duplex = require('./duplex')
- const Readable = require('./readable')
- const Writable = require('./writable')
- const { createDeferredPromise } = require('../../ours/util')
- const from = require('./from')
- const Blob = globalThis.Blob || bufferModule.Blob
- const isBlob =
- typeof Blob !== 'undefined'
- ? function isBlob(b) {
- return b instanceof Blob
- }
- : function isBlob(b) {
- return false
- }
- const AbortController = globalThis.AbortController || require('abort-controller').AbortController
- const { FunctionPrototypeCall } = require('../../ours/primordials')
- // This is needed for pre node 17.
- class Duplexify extends Duplex {
- constructor(options) {
- super(options)
- // https://github.com/nodejs/node/pull/34385
- if ((options === null || options === undefined ? undefined : options.readable) === false) {
- this._readableState.readable = false
- this._readableState.ended = true
- this._readableState.endEmitted = true
- }
- if ((options === null || options === undefined ? undefined : options.writable) === false) {
- this._writableState.writable = false
- this._writableState.ending = true
- this._writableState.ended = true
- this._writableState.finished = true
- }
- }
- }
- module.exports = function duplexify(body, name) {
- if (isDuplexNodeStream(body)) {
- return body
- }
- if (isReadableNodeStream(body)) {
- return _duplexify({
- readable: body
- })
- }
- if (isWritableNodeStream(body)) {
- return _duplexify({
- writable: body
- })
- }
- if (isNodeStream(body)) {
- return _duplexify({
- writable: false,
- readable: false
- })
- }
- if (isReadableStream(body)) {
- return _duplexify({
- readable: Readable.fromWeb(body)
- })
- }
- if (isWritableStream(body)) {
- return _duplexify({
- writable: Writable.fromWeb(body)
- })
- }
- if (typeof body === 'function') {
- const { value, write, final, destroy } = fromAsyncGen(body)
- if (isIterable(value)) {
- return from(Duplexify, value, {
- // TODO (ronag): highWaterMark?
- objectMode: true,
- write,
- final,
- destroy
- })
- }
- const then = value === null || value === undefined ? undefined : value.then
- if (typeof then === 'function') {
- let d
- const promise = FunctionPrototypeCall(
- then,
- value,
- (val) => {
- if (val != null) {
- throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val)
- }
- },
- (err) => {
- destroyer(d, err)
- }
- )
- return (d = new Duplexify({
- // TODO (ronag): highWaterMark?
- objectMode: true,
- readable: false,
- write,
- final(cb) {
- final(async () => {
- try {
- await promise
- process.nextTick(cb, null)
- } catch (err) {
- process.nextTick(cb, err)
- }
- })
- },
- destroy
- }))
- }
- throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or AsyncFunction', name, value)
- }
- if (isBlob(body)) {
- return duplexify(body.arrayBuffer())
- }
- if (isIterable(body)) {
- return from(Duplexify, body, {
- // TODO (ronag): highWaterMark?
- objectMode: true,
- writable: false
- })
- }
- if (
- isReadableStream(body === null || body === undefined ? undefined : body.readable) &&
- isWritableStream(body === null || body === undefined ? undefined : body.writable)
- ) {
- return Duplexify.fromWeb(body)
- }
- if (
- typeof (body === null || body === undefined ? undefined : body.writable) === 'object' ||
- typeof (body === null || body === undefined ? undefined : body.readable) === 'object'
- ) {
- const readable =
- body !== null && body !== undefined && body.readable
- ? isReadableNodeStream(body === null || body === undefined ? undefined : body.readable)
- ? body === null || body === undefined
- ? undefined
- : body.readable
- : duplexify(body.readable)
- : undefined
- const writable =
- body !== null && body !== undefined && body.writable
- ? isWritableNodeStream(body === null || body === undefined ? undefined : body.writable)
- ? body === null || body === undefined
- ? undefined
- : body.writable
- : duplexify(body.writable)
- : undefined
- return _duplexify({
- readable,
- writable
- })
- }
- const then = body === null || body === undefined ? undefined : body.then
- if (typeof then === 'function') {
- let d
- FunctionPrototypeCall(
- then,
- body,
- (val) => {
- if (val != null) {
- d.push(val)
- }
- d.push(null)
- },
- (err) => {
- destroyer(d, err)
- }
- )
- return (d = new Duplexify({
- objectMode: true,
- writable: false,
- read() {}
- }))
- }
- throw new ERR_INVALID_ARG_TYPE(
- name,
- [
- 'Blob',
- 'ReadableStream',
- 'WritableStream',
- 'Stream',
- 'Iterable',
- 'AsyncIterable',
- 'Function',
- '{ readable, writable } pair',
- 'Promise'
- ],
- body
- )
- }
- function fromAsyncGen(fn) {
- let { promise, resolve } = createDeferredPromise()
- const ac = new AbortController()
- const signal = ac.signal
- const value = fn(
- (async function* () {
- while (true) {
- const _promise = promise
- promise = null
- const { chunk, done, cb } = await _promise
- process.nextTick(cb)
- if (done) return
- if (signal.aborted)
- throw new AbortError(undefined, {
- cause: signal.reason
- })
- ;({ promise, resolve } = createDeferredPromise())
- yield chunk
- }
- })(),
- {
- signal
- }
- )
- return {
- value,
- write(chunk, encoding, cb) {
- const _resolve = resolve
- resolve = null
- _resolve({
- chunk,
- done: false,
- cb
- })
- },
- final(cb) {
- const _resolve = resolve
- resolve = null
- _resolve({
- done: true,
- cb
- })
- },
- destroy(err, cb) {
- ac.abort()
- cb(err)
- }
- }
- }
- function _duplexify(pair) {
- const r = pair.readable && typeof pair.readable.read !== 'function' ? Readable.wrap(pair.readable) : pair.readable
- const w = pair.writable
- let readable = !!isReadable(r)
- let writable = !!isWritable(w)
- let ondrain
- let onfinish
- let onreadable
- let onclose
- let d
- function onfinished(err) {
- const cb = onclose
- onclose = null
- if (cb) {
- cb(err)
- } else if (err) {
- d.destroy(err)
- }
- }
- // TODO(ronag): Avoid double buffering.
- // Implement Writable/Readable/Duplex traits.
- // See, https://github.com/nodejs/node/pull/33515.
- d = new Duplexify({
- // TODO (ronag): highWaterMark?
- readableObjectMode: !!(r !== null && r !== undefined && r.readableObjectMode),
- writableObjectMode: !!(w !== null && w !== undefined && w.writableObjectMode),
- readable,
- writable
- })
- if (writable) {
- eos(w, (err) => {
- writable = false
- if (err) {
- destroyer(r, err)
- }
- onfinished(err)
- })
- d._write = function (chunk, encoding, callback) {
- if (w.write(chunk, encoding)) {
- callback()
- } else {
- ondrain = callback
- }
- }
- d._final = function (callback) {
- w.end()
- onfinish = callback
- }
- w.on('drain', function () {
- if (ondrain) {
- const cb = ondrain
- ondrain = null
- cb()
- }
- })
- w.on('finish', function () {
- if (onfinish) {
- const cb = onfinish
- onfinish = null
- cb()
- }
- })
- }
- if (readable) {
- eos(r, (err) => {
- readable = false
- if (err) {
- destroyer(r, err)
- }
- onfinished(err)
- })
- r.on('readable', function () {
- if (onreadable) {
- const cb = onreadable
- onreadable = null
- cb()
- }
- })
- r.on('end', function () {
- d.push(null)
- })
- d._read = function () {
- while (true) {
- const buf = r.read()
- if (buf === null) {
- onreadable = d._read
- return
- }
- if (!d.push(buf)) {
- return
- }
- }
- }
- }
- d._destroy = function (err, callback) {
- if (!err && onclose !== null) {
- err = new AbortError()
- }
- onreadable = null
- ondrain = null
- onfinish = null
- if (onclose === null) {
- callback(err)
- } else {
- onclose = callback
- destroyer(w, err)
- destroyer(r, err)
- }
- }
- return d
- }
|