operators.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. 'use strict'
  2. const AbortController = globalThis.AbortController || require('abort-controller').AbortController
  3. const {
  4. codes: { ERR_INVALID_ARG_VALUE, ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE },
  5. AbortError
  6. } = require('../../ours/errors')
  7. const { validateAbortSignal, validateInteger, validateObject } = require('../validators')
  8. const kWeakHandler = require('../../ours/primordials').Symbol('kWeak')
  9. const kResistStopPropagation = require('../../ours/primordials').Symbol('kResistStopPropagation')
  10. const { finished } = require('./end-of-stream')
  11. const staticCompose = require('./compose')
  12. const { addAbortSignalNoValidate } = require('./add-abort-signal')
  13. const { isWritable, isNodeStream } = require('./utils')
  14. const { deprecate } = require('../../ours/util')
  15. const {
  16. ArrayPrototypePush,
  17. Boolean,
  18. MathFloor,
  19. Number,
  20. NumberIsNaN,
  21. Promise,
  22. PromiseReject,
  23. PromiseResolve,
  24. PromisePrototypeThen,
  25. Symbol
  26. } = require('../../ours/primordials')
  27. const kEmpty = Symbol('kEmpty')
  28. const kEof = Symbol('kEof')
  29. function compose(stream, options) {
  30. if (options != null) {
  31. validateObject(options, 'options')
  32. }
  33. if ((options === null || options === undefined ? undefined : options.signal) != null) {
  34. validateAbortSignal(options.signal, 'options.signal')
  35. }
  36. if (isNodeStream(stream) && !isWritable(stream)) {
  37. throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable')
  38. }
  39. const composedStream = staticCompose(this, stream)
  40. if (options !== null && options !== undefined && options.signal) {
  41. // Not validating as we already validated before
  42. addAbortSignalNoValidate(options.signal, composedStream)
  43. }
  44. return composedStream
  45. }
  46. function map(fn, options) {
  47. if (typeof fn !== 'function') {
  48. throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
  49. }
  50. if (options != null) {
  51. validateObject(options, 'options')
  52. }
  53. if ((options === null || options === undefined ? undefined : options.signal) != null) {
  54. validateAbortSignal(options.signal, 'options.signal')
  55. }
  56. let concurrency = 1
  57. if ((options === null || options === undefined ? undefined : options.concurrency) != null) {
  58. concurrency = MathFloor(options.concurrency)
  59. }
  60. let highWaterMark = concurrency - 1
  61. if ((options === null || options === undefined ? undefined : options.highWaterMark) != null) {
  62. highWaterMark = MathFloor(options.highWaterMark)
  63. }
  64. validateInteger(concurrency, 'options.concurrency', 1)
  65. validateInteger(highWaterMark, 'options.highWaterMark', 0)
  66. highWaterMark += concurrency
  67. return async function* map() {
  68. const signal = require('../../ours/util').AbortSignalAny(
  69. [options === null || options === undefined ? undefined : options.signal].filter(Boolean)
  70. )
  71. const stream = this
  72. const queue = []
  73. const signalOpt = {
  74. signal
  75. }
  76. let next
  77. let resume
  78. let done = false
  79. let cnt = 0
  80. function onCatch() {
  81. done = true
  82. afterItemProcessed()
  83. }
  84. function afterItemProcessed() {
  85. cnt -= 1
  86. maybeResume()
  87. }
  88. function maybeResume() {
  89. if (resume && !done && cnt < concurrency && queue.length < highWaterMark) {
  90. resume()
  91. resume = null
  92. }
  93. }
  94. async function pump() {
  95. try {
  96. for await (let val of stream) {
  97. if (done) {
  98. return
  99. }
  100. if (signal.aborted) {
  101. throw new AbortError()
  102. }
  103. try {
  104. val = fn(val, signalOpt)
  105. if (val === kEmpty) {
  106. continue
  107. }
  108. val = PromiseResolve(val)
  109. } catch (err) {
  110. val = PromiseReject(err)
  111. }
  112. cnt += 1
  113. PromisePrototypeThen(val, afterItemProcessed, onCatch)
  114. queue.push(val)
  115. if (next) {
  116. next()
  117. next = null
  118. }
  119. if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
  120. await new Promise((resolve) => {
  121. resume = resolve
  122. })
  123. }
  124. }
  125. queue.push(kEof)
  126. } catch (err) {
  127. const val = PromiseReject(err)
  128. PromisePrototypeThen(val, afterItemProcessed, onCatch)
  129. queue.push(val)
  130. } finally {
  131. done = true
  132. if (next) {
  133. next()
  134. next = null
  135. }
  136. }
  137. }
  138. pump()
  139. try {
  140. while (true) {
  141. while (queue.length > 0) {
  142. const val = await queue[0]
  143. if (val === kEof) {
  144. return
  145. }
  146. if (signal.aborted) {
  147. throw new AbortError()
  148. }
  149. if (val !== kEmpty) {
  150. yield val
  151. }
  152. queue.shift()
  153. maybeResume()
  154. }
  155. await new Promise((resolve) => {
  156. next = resolve
  157. })
  158. }
  159. } finally {
  160. done = true
  161. if (resume) {
  162. resume()
  163. resume = null
  164. }
  165. }
  166. }.call(this)
  167. }
  168. function asIndexedPairs(options = undefined) {
  169. if (options != null) {
  170. validateObject(options, 'options')
  171. }
  172. if ((options === null || options === undefined ? undefined : options.signal) != null) {
  173. validateAbortSignal(options.signal, 'options.signal')
  174. }
  175. return async function* asIndexedPairs() {
  176. let index = 0
  177. for await (const val of this) {
  178. var _options$signal
  179. if (
  180. options !== null &&
  181. options !== undefined &&
  182. (_options$signal = options.signal) !== null &&
  183. _options$signal !== undefined &&
  184. _options$signal.aborted
  185. ) {
  186. throw new AbortError({
  187. cause: options.signal.reason
  188. })
  189. }
  190. yield [index++, val]
  191. }
  192. }.call(this)
  193. }
  194. async function some(fn, options = undefined) {
  195. for await (const unused of filter.call(this, fn, options)) {
  196. return true
  197. }
  198. return false
  199. }
  200. async function every(fn, options = undefined) {
  201. if (typeof fn !== 'function') {
  202. throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
  203. }
  204. // https://en.wikipedia.org/wiki/De_Morgan%27s_laws
  205. return !(await some.call(
  206. this,
  207. async (...args) => {
  208. return !(await fn(...args))
  209. },
  210. options
  211. ))
  212. }
  213. async function find(fn, options) {
  214. for await (const result of filter.call(this, fn, options)) {
  215. return result
  216. }
  217. return undefined
  218. }
  219. async function forEach(fn, options) {
  220. if (typeof fn !== 'function') {
  221. throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
  222. }
  223. async function forEachFn(value, options) {
  224. await fn(value, options)
  225. return kEmpty
  226. }
  227. // eslint-disable-next-line no-unused-vars
  228. for await (const unused of map.call(this, forEachFn, options));
  229. }
  230. function filter(fn, options) {
  231. if (typeof fn !== 'function') {
  232. throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
  233. }
  234. async function filterFn(value, options) {
  235. if (await fn(value, options)) {
  236. return value
  237. }
  238. return kEmpty
  239. }
  240. return map.call(this, filterFn, options)
  241. }
  242. // Specific to provide better error to reduce since the argument is only
  243. // missing if the stream has no items in it - but the code is still appropriate
  244. class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
  245. constructor() {
  246. super('reduce')
  247. this.message = 'Reduce of an empty stream requires an initial value'
  248. }
  249. }
  250. async function reduce(reducer, initialValue, options) {
  251. var _options$signal2
  252. if (typeof reducer !== 'function') {
  253. throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer)
  254. }
  255. if (options != null) {
  256. validateObject(options, 'options')
  257. }
  258. if ((options === null || options === undefined ? undefined : options.signal) != null) {
  259. validateAbortSignal(options.signal, 'options.signal')
  260. }
  261. let hasInitialValue = arguments.length > 1
  262. if (
  263. options !== null &&
  264. options !== undefined &&
  265. (_options$signal2 = options.signal) !== null &&
  266. _options$signal2 !== undefined &&
  267. _options$signal2.aborted
  268. ) {
  269. const err = new AbortError(undefined, {
  270. cause: options.signal.reason
  271. })
  272. this.once('error', () => {}) // The error is already propagated
  273. await finished(this.destroy(err))
  274. throw err
  275. }
  276. const ac = new AbortController()
  277. const signal = ac.signal
  278. if (options !== null && options !== undefined && options.signal) {
  279. const opts = {
  280. once: true,
  281. [kWeakHandler]: this,
  282. [kResistStopPropagation]: true
  283. }
  284. options.signal.addEventListener('abort', () => ac.abort(), opts)
  285. }
  286. let gotAnyItemFromStream = false
  287. try {
  288. for await (const value of this) {
  289. var _options$signal3
  290. gotAnyItemFromStream = true
  291. if (
  292. options !== null &&
  293. options !== undefined &&
  294. (_options$signal3 = options.signal) !== null &&
  295. _options$signal3 !== undefined &&
  296. _options$signal3.aborted
  297. ) {
  298. throw new AbortError()
  299. }
  300. if (!hasInitialValue) {
  301. initialValue = value
  302. hasInitialValue = true
  303. } else {
  304. initialValue = await reducer(initialValue, value, {
  305. signal
  306. })
  307. }
  308. }
  309. if (!gotAnyItemFromStream && !hasInitialValue) {
  310. throw new ReduceAwareErrMissingArgs()
  311. }
  312. } finally {
  313. ac.abort()
  314. }
  315. return initialValue
  316. }
  317. async function toArray(options) {
  318. if (options != null) {
  319. validateObject(options, 'options')
  320. }
  321. if ((options === null || options === undefined ? undefined : options.signal) != null) {
  322. validateAbortSignal(options.signal, 'options.signal')
  323. }
  324. const result = []
  325. for await (const val of this) {
  326. var _options$signal4
  327. if (
  328. options !== null &&
  329. options !== undefined &&
  330. (_options$signal4 = options.signal) !== null &&
  331. _options$signal4 !== undefined &&
  332. _options$signal4.aborted
  333. ) {
  334. throw new AbortError(undefined, {
  335. cause: options.signal.reason
  336. })
  337. }
  338. ArrayPrototypePush(result, val)
  339. }
  340. return result
  341. }
  342. function flatMap(fn, options) {
  343. const values = map.call(this, fn, options)
  344. return async function* flatMap() {
  345. for await (const val of values) {
  346. yield* val
  347. }
  348. }.call(this)
  349. }
  350. function toIntegerOrInfinity(number) {
  351. // We coerce here to align with the spec
  352. // https://github.com/tc39/proposal-iterator-helpers/issues/169
  353. number = Number(number)
  354. if (NumberIsNaN(number)) {
  355. return 0
  356. }
  357. if (number < 0) {
  358. throw new ERR_OUT_OF_RANGE('number', '>= 0', number)
  359. }
  360. return number
  361. }
  362. function drop(number, options = undefined) {
  363. if (options != null) {
  364. validateObject(options, 'options')
  365. }
  366. if ((options === null || options === undefined ? undefined : options.signal) != null) {
  367. validateAbortSignal(options.signal, 'options.signal')
  368. }
  369. number = toIntegerOrInfinity(number)
  370. return async function* drop() {
  371. var _options$signal5
  372. if (
  373. options !== null &&
  374. options !== undefined &&
  375. (_options$signal5 = options.signal) !== null &&
  376. _options$signal5 !== undefined &&
  377. _options$signal5.aborted
  378. ) {
  379. throw new AbortError()
  380. }
  381. for await (const val of this) {
  382. var _options$signal6
  383. if (
  384. options !== null &&
  385. options !== undefined &&
  386. (_options$signal6 = options.signal) !== null &&
  387. _options$signal6 !== undefined &&
  388. _options$signal6.aborted
  389. ) {
  390. throw new AbortError()
  391. }
  392. if (number-- <= 0) {
  393. yield val
  394. }
  395. }
  396. }.call(this)
  397. }
  398. function take(number, options = undefined) {
  399. if (options != null) {
  400. validateObject(options, 'options')
  401. }
  402. if ((options === null || options === undefined ? undefined : options.signal) != null) {
  403. validateAbortSignal(options.signal, 'options.signal')
  404. }
  405. number = toIntegerOrInfinity(number)
  406. return async function* take() {
  407. var _options$signal7
  408. if (
  409. options !== null &&
  410. options !== undefined &&
  411. (_options$signal7 = options.signal) !== null &&
  412. _options$signal7 !== undefined &&
  413. _options$signal7.aborted
  414. ) {
  415. throw new AbortError()
  416. }
  417. for await (const val of this) {
  418. var _options$signal8
  419. if (
  420. options !== null &&
  421. options !== undefined &&
  422. (_options$signal8 = options.signal) !== null &&
  423. _options$signal8 !== undefined &&
  424. _options$signal8.aborted
  425. ) {
  426. throw new AbortError()
  427. }
  428. if (number-- > 0) {
  429. yield val
  430. }
  431. // Don't get another item from iterator in case we reached the end
  432. if (number <= 0) {
  433. return
  434. }
  435. }
  436. }.call(this)
  437. }
  438. module.exports.streamReturningOperators = {
  439. asIndexedPairs: deprecate(asIndexedPairs, 'readable.asIndexedPairs will be removed in a future version.'),
  440. drop,
  441. filter,
  442. flatMap,
  443. map,
  444. take,
  445. compose
  446. }
  447. module.exports.promiseReturningOperators = {
  448. every,
  449. forEach,
  450. reduce,
  451. toArray,
  452. some,
  453. find
  454. }