| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- 'use strict'
- const AbortController = globalThis.AbortController || require('abort-controller').AbortController
- const {
- codes: { ERR_INVALID_ARG_VALUE, ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE },
- AbortError
- } = require('../../ours/errors')
- const { validateAbortSignal, validateInteger, validateObject } = require('../validators')
- const kWeakHandler = require('../../ours/primordials').Symbol('kWeak')
- const kResistStopPropagation = require('../../ours/primordials').Symbol('kResistStopPropagation')
- const { finished } = require('./end-of-stream')
- const staticCompose = require('./compose')
- const { addAbortSignalNoValidate } = require('./add-abort-signal')
- const { isWritable, isNodeStream } = require('./utils')
- const { deprecate } = require('../../ours/util')
- const {
- ArrayPrototypePush,
- Boolean,
- MathFloor,
- Number,
- NumberIsNaN,
- Promise,
- PromiseReject,
- PromiseResolve,
- PromisePrototypeThen,
- Symbol
- } = require('../../ours/primordials')
- const kEmpty = Symbol('kEmpty')
- const kEof = Symbol('kEof')
- function compose(stream, options) {
- if (options != null) {
- validateObject(options, 'options')
- }
- if ((options === null || options === undefined ? undefined : options.signal) != null) {
- validateAbortSignal(options.signal, 'options.signal')
- }
- if (isNodeStream(stream) && !isWritable(stream)) {
- throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable')
- }
- const composedStream = staticCompose(this, stream)
- if (options !== null && options !== undefined && options.signal) {
- // Not validating as we already validated before
- addAbortSignalNoValidate(options.signal, composedStream)
- }
- return composedStream
- }
- function map(fn, options) {
- if (typeof fn !== 'function') {
- throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
- }
- if (options != null) {
- validateObject(options, 'options')
- }
- if ((options === null || options === undefined ? undefined : options.signal) != null) {
- validateAbortSignal(options.signal, 'options.signal')
- }
- let concurrency = 1
- if ((options === null || options === undefined ? undefined : options.concurrency) != null) {
- concurrency = MathFloor(options.concurrency)
- }
- let highWaterMark = concurrency - 1
- if ((options === null || options === undefined ? undefined : options.highWaterMark) != null) {
- highWaterMark = MathFloor(options.highWaterMark)
- }
- validateInteger(concurrency, 'options.concurrency', 1)
- validateInteger(highWaterMark, 'options.highWaterMark', 0)
- highWaterMark += concurrency
- return async function* map() {
- const signal = require('../../ours/util').AbortSignalAny(
- [options === null || options === undefined ? undefined : options.signal].filter(Boolean)
- )
- const stream = this
- const queue = []
- const signalOpt = {
- signal
- }
- let next
- let resume
- let done = false
- let cnt = 0
- function onCatch() {
- done = true
- afterItemProcessed()
- }
- function afterItemProcessed() {
- cnt -= 1
- maybeResume()
- }
- function maybeResume() {
- if (resume && !done && cnt < concurrency && queue.length < highWaterMark) {
- resume()
- resume = null
- }
- }
- async function pump() {
- try {
- for await (let val of stream) {
- if (done) {
- return
- }
- if (signal.aborted) {
- throw new AbortError()
- }
- try {
- val = fn(val, signalOpt)
- if (val === kEmpty) {
- continue
- }
- val = PromiseResolve(val)
- } catch (err) {
- val = PromiseReject(err)
- }
- cnt += 1
- PromisePrototypeThen(val, afterItemProcessed, onCatch)
- queue.push(val)
- if (next) {
- next()
- next = null
- }
- if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
- await new Promise((resolve) => {
- resume = resolve
- })
- }
- }
- queue.push(kEof)
- } catch (err) {
- const val = PromiseReject(err)
- PromisePrototypeThen(val, afterItemProcessed, onCatch)
- queue.push(val)
- } finally {
- done = true
- if (next) {
- next()
- next = null
- }
- }
- }
- pump()
- try {
- while (true) {
- while (queue.length > 0) {
- const val = await queue[0]
- if (val === kEof) {
- return
- }
- if (signal.aborted) {
- throw new AbortError()
- }
- if (val !== kEmpty) {
- yield val
- }
- queue.shift()
- maybeResume()
- }
- await new Promise((resolve) => {
- next = resolve
- })
- }
- } finally {
- done = true
- if (resume) {
- resume()
- resume = null
- }
- }
- }.call(this)
- }
- function asIndexedPairs(options = undefined) {
- if (options != null) {
- validateObject(options, 'options')
- }
- if ((options === null || options === undefined ? undefined : options.signal) != null) {
- validateAbortSignal(options.signal, 'options.signal')
- }
- return async function* asIndexedPairs() {
- let index = 0
- for await (const val of this) {
- var _options$signal
- if (
- options !== null &&
- options !== undefined &&
- (_options$signal = options.signal) !== null &&
- _options$signal !== undefined &&
- _options$signal.aborted
- ) {
- throw new AbortError({
- cause: options.signal.reason
- })
- }
- yield [index++, val]
- }
- }.call(this)
- }
- async function some(fn, options = undefined) {
- for await (const unused of filter.call(this, fn, options)) {
- return true
- }
- return false
- }
- async function every(fn, options = undefined) {
- if (typeof fn !== 'function') {
- throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
- }
- // https://en.wikipedia.org/wiki/De_Morgan%27s_laws
- return !(await some.call(
- this,
- async (...args) => {
- return !(await fn(...args))
- },
- options
- ))
- }
- async function find(fn, options) {
- for await (const result of filter.call(this, fn, options)) {
- return result
- }
- return undefined
- }
- async function forEach(fn, options) {
- if (typeof fn !== 'function') {
- throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
- }
- async function forEachFn(value, options) {
- await fn(value, options)
- return kEmpty
- }
- // eslint-disable-next-line no-unused-vars
- for await (const unused of map.call(this, forEachFn, options));
- }
- function filter(fn, options) {
- if (typeof fn !== 'function') {
- throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
- }
- async function filterFn(value, options) {
- if (await fn(value, options)) {
- return value
- }
- return kEmpty
- }
- return map.call(this, filterFn, options)
- }
- // Specific to provide better error to reduce since the argument is only
- // missing if the stream has no items in it - but the code is still appropriate
- class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
- constructor() {
- super('reduce')
- this.message = 'Reduce of an empty stream requires an initial value'
- }
- }
- async function reduce(reducer, initialValue, options) {
- var _options$signal2
- if (typeof reducer !== 'function') {
- throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer)
- }
- if (options != null) {
- validateObject(options, 'options')
- }
- if ((options === null || options === undefined ? undefined : options.signal) != null) {
- validateAbortSignal(options.signal, 'options.signal')
- }
- let hasInitialValue = arguments.length > 1
- if (
- options !== null &&
- options !== undefined &&
- (_options$signal2 = options.signal) !== null &&
- _options$signal2 !== undefined &&
- _options$signal2.aborted
- ) {
- const err = new AbortError(undefined, {
- cause: options.signal.reason
- })
- this.once('error', () => {}) // The error is already propagated
- await finished(this.destroy(err))
- throw err
- }
- const ac = new AbortController()
- const signal = ac.signal
- if (options !== null && options !== undefined && options.signal) {
- const opts = {
- once: true,
- [kWeakHandler]: this,
- [kResistStopPropagation]: true
- }
- options.signal.addEventListener('abort', () => ac.abort(), opts)
- }
- let gotAnyItemFromStream = false
- try {
- for await (const value of this) {
- var _options$signal3
- gotAnyItemFromStream = true
- if (
- options !== null &&
- options !== undefined &&
- (_options$signal3 = options.signal) !== null &&
- _options$signal3 !== undefined &&
- _options$signal3.aborted
- ) {
- throw new AbortError()
- }
- if (!hasInitialValue) {
- initialValue = value
- hasInitialValue = true
- } else {
- initialValue = await reducer(initialValue, value, {
- signal
- })
- }
- }
- if (!gotAnyItemFromStream && !hasInitialValue) {
- throw new ReduceAwareErrMissingArgs()
- }
- } finally {
- ac.abort()
- }
- return initialValue
- }
- async function toArray(options) {
- if (options != null) {
- validateObject(options, 'options')
- }
- if ((options === null || options === undefined ? undefined : options.signal) != null) {
- validateAbortSignal(options.signal, 'options.signal')
- }
- const result = []
- for await (const val of this) {
- var _options$signal4
- if (
- options !== null &&
- options !== undefined &&
- (_options$signal4 = options.signal) !== null &&
- _options$signal4 !== undefined &&
- _options$signal4.aborted
- ) {
- throw new AbortError(undefined, {
- cause: options.signal.reason
- })
- }
- ArrayPrototypePush(result, val)
- }
- return result
- }
- function flatMap(fn, options) {
- const values = map.call(this, fn, options)
- return async function* flatMap() {
- for await (const val of values) {
- yield* val
- }
- }.call(this)
- }
- function toIntegerOrInfinity(number) {
- // We coerce here to align with the spec
- // https://github.com/tc39/proposal-iterator-helpers/issues/169
- number = Number(number)
- if (NumberIsNaN(number)) {
- return 0
- }
- if (number < 0) {
- throw new ERR_OUT_OF_RANGE('number', '>= 0', number)
- }
- return number
- }
- function drop(number, options = undefined) {
- if (options != null) {
- validateObject(options, 'options')
- }
- if ((options === null || options === undefined ? undefined : options.signal) != null) {
- validateAbortSignal(options.signal, 'options.signal')
- }
- number = toIntegerOrInfinity(number)
- return async function* drop() {
- var _options$signal5
- if (
- options !== null &&
- options !== undefined &&
- (_options$signal5 = options.signal) !== null &&
- _options$signal5 !== undefined &&
- _options$signal5.aborted
- ) {
- throw new AbortError()
- }
- for await (const val of this) {
- var _options$signal6
- if (
- options !== null &&
- options !== undefined &&
- (_options$signal6 = options.signal) !== null &&
- _options$signal6 !== undefined &&
- _options$signal6.aborted
- ) {
- throw new AbortError()
- }
- if (number-- <= 0) {
- yield val
- }
- }
- }.call(this)
- }
- function take(number, options = undefined) {
- if (options != null) {
- validateObject(options, 'options')
- }
- if ((options === null || options === undefined ? undefined : options.signal) != null) {
- validateAbortSignal(options.signal, 'options.signal')
- }
- number = toIntegerOrInfinity(number)
- return async function* take() {
- var _options$signal7
- if (
- options !== null &&
- options !== undefined &&
- (_options$signal7 = options.signal) !== null &&
- _options$signal7 !== undefined &&
- _options$signal7.aborted
- ) {
- throw new AbortError()
- }
- for await (const val of this) {
- var _options$signal8
- if (
- options !== null &&
- options !== undefined &&
- (_options$signal8 = options.signal) !== null &&
- _options$signal8 !== undefined &&
- _options$signal8.aborted
- ) {
- throw new AbortError()
- }
- if (number-- > 0) {
- yield val
- }
- // Don't get another item from iterator in case we reached the end
- if (number <= 0) {
- return
- }
- }
- }.call(this)
- }
- module.exports.streamReturningOperators = {
- asIndexedPairs: deprecate(asIndexedPairs, 'readable.asIndexedPairs will be removed in a future version.'),
- drop,
- filter,
- flatMap,
- map,
- take,
- compose
- }
- module.exports.promiseReturningOperators = {
- every,
- forEach,
- reduce,
- toArray,
- some,
- find
- }
|