| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- 'use strict'
- /* replacement start */
- const process = require('process/')
- /* replacement end */
- const { PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials')
- const { Buffer } = require('buffer')
- const { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } = require('../../ours/errors').codes
- function from(Readable, iterable, opts) {
- let iterator
- if (typeof iterable === 'string' || iterable instanceof Buffer) {
- return new Readable({
- objectMode: true,
- ...opts,
- read() {
- this.push(iterable)
- this.push(null)
- }
- })
- }
- let isAsync
- if (iterable && iterable[SymbolAsyncIterator]) {
- isAsync = true
- iterator = iterable[SymbolAsyncIterator]()
- } else if (iterable && iterable[SymbolIterator]) {
- isAsync = false
- iterator = iterable[SymbolIterator]()
- } else {
- throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable)
- }
- const readable = new Readable({
- objectMode: true,
- highWaterMark: 1,
- // TODO(ronag): What options should be allowed?
- ...opts
- })
- // Flag to protect against _read
- // being called before last iteration completion.
- let reading = false
- readable._read = function () {
- if (!reading) {
- reading = true
- next()
- }
- }
- readable._destroy = function (error, cb) {
- PromisePrototypeThen(
- close(error),
- () => process.nextTick(cb, error),
- // nextTick is here in case cb throws
- (e) => process.nextTick(cb, e || error)
- )
- }
- async function close(error) {
- const hadError = error !== undefined && error !== null
- const hasThrow = typeof iterator.throw === 'function'
- if (hadError && hasThrow) {
- const { value, done } = await iterator.throw(error)
- await value
- if (done) {
- return
- }
- }
- if (typeof iterator.return === 'function') {
- const { value } = await iterator.return()
- await value
- }
- }
- async function next() {
- for (;;) {
- try {
- const { value, done } = isAsync ? await iterator.next() : iterator.next()
- if (done) {
- readable.push(null)
- } else {
- const res = value && typeof value.then === 'function' ? await value : value
- if (res === null) {
- reading = false
- throw new ERR_STREAM_NULL_VALUES()
- } else if (readable.push(res)) {
- continue
- } else {
- reading = false
- }
- }
- } catch (err) {
- readable.destroy(err)
- }
- break
- }
- }
- return readable
- }
- module.exports = from
|