pipeline.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. /* replacement start */
  2. const process = require('process/')
  3. /* replacement end */
  4. // Ported from https://github.com/mafintosh/pump with
  5. // permission from the author, Mathias Buus (@mafintosh).
  6. ;('use strict')
  7. const { ArrayIsArray, Promise, SymbolAsyncIterator, SymbolDispose } = require('../../ours/primordials')
  8. const eos = require('./end-of-stream')
  9. const { once } = require('../../ours/util')
  10. const destroyImpl = require('./destroy')
  11. const Duplex = require('./duplex')
  12. const {
  13. aggregateTwoErrors,
  14. codes: {
  15. ERR_INVALID_ARG_TYPE,
  16. ERR_INVALID_RETURN_VALUE,
  17. ERR_MISSING_ARGS,
  18. ERR_STREAM_DESTROYED,
  19. ERR_STREAM_PREMATURE_CLOSE
  20. },
  21. AbortError
  22. } = require('../../ours/errors')
  23. const { validateFunction, validateAbortSignal } = require('../validators')
  24. const {
  25. isIterable,
  26. isReadable,
  27. isReadableNodeStream,
  28. isNodeStream,
  29. isTransformStream,
  30. isWebStream,
  31. isReadableStream,
  32. isReadableFinished
  33. } = require('./utils')
  34. const AbortController = globalThis.AbortController || require('abort-controller').AbortController
  35. let PassThrough
  36. let Readable
  37. let addAbortListener
  38. function destroyer(stream, reading, writing) {
  39. let finished = false
  40. stream.on('close', () => {
  41. finished = true
  42. })
  43. const cleanup = eos(
  44. stream,
  45. {
  46. readable: reading,
  47. writable: writing
  48. },
  49. (err) => {
  50. finished = !err
  51. }
  52. )
  53. return {
  54. destroy: (err) => {
  55. if (finished) return
  56. finished = true
  57. destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'))
  58. },
  59. cleanup
  60. }
  61. }
  62. function popCallback(streams) {
  63. // Streams should never be an empty array. It should always contain at least
  64. // a single stream. Therefore optimize for the average case instead of
  65. // checking for length === 0 as well.
  66. validateFunction(streams[streams.length - 1], 'streams[stream.length - 1]')
  67. return streams.pop()
  68. }
  69. function makeAsyncIterable(val) {
  70. if (isIterable(val)) {
  71. return val
  72. } else if (isReadableNodeStream(val)) {
  73. // Legacy streams are not Iterable.
  74. return fromReadable(val)
  75. }
  76. throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], val)
  77. }
  78. async function* fromReadable(val) {
  79. if (!Readable) {
  80. Readable = require('./readable')
  81. }
  82. yield* Readable.prototype[SymbolAsyncIterator].call(val)
  83. }
  84. async function pumpToNode(iterable, writable, finish, { end }) {
  85. let error
  86. let onresolve = null
  87. const resume = (err) => {
  88. if (err) {
  89. error = err
  90. }
  91. if (onresolve) {
  92. const callback = onresolve
  93. onresolve = null
  94. callback()
  95. }
  96. }
  97. const wait = () =>
  98. new Promise((resolve, reject) => {
  99. if (error) {
  100. reject(error)
  101. } else {
  102. onresolve = () => {
  103. if (error) {
  104. reject(error)
  105. } else {
  106. resolve()
  107. }
  108. }
  109. }
  110. })
  111. writable.on('drain', resume)
  112. const cleanup = eos(
  113. writable,
  114. {
  115. readable: false
  116. },
  117. resume
  118. )
  119. try {
  120. if (writable.writableNeedDrain) {
  121. await wait()
  122. }
  123. for await (const chunk of iterable) {
  124. if (!writable.write(chunk)) {
  125. await wait()
  126. }
  127. }
  128. if (end) {
  129. writable.end()
  130. await wait()
  131. }
  132. finish()
  133. } catch (err) {
  134. finish(error !== err ? aggregateTwoErrors(error, err) : err)
  135. } finally {
  136. cleanup()
  137. writable.off('drain', resume)
  138. }
  139. }
  140. async function pumpToWeb(readable, writable, finish, { end }) {
  141. if (isTransformStream(writable)) {
  142. writable = writable.writable
  143. }
  144. // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
  145. const writer = writable.getWriter()
  146. try {
  147. for await (const chunk of readable) {
  148. await writer.ready
  149. writer.write(chunk).catch(() => {})
  150. }
  151. await writer.ready
  152. if (end) {
  153. await writer.close()
  154. }
  155. finish()
  156. } catch (err) {
  157. try {
  158. await writer.abort(err)
  159. finish(err)
  160. } catch (err) {
  161. finish(err)
  162. }
  163. }
  164. }
  165. function pipeline(...streams) {
  166. return pipelineImpl(streams, once(popCallback(streams)))
  167. }
  168. function pipelineImpl(streams, callback, opts) {
  169. if (streams.length === 1 && ArrayIsArray(streams[0])) {
  170. streams = streams[0]
  171. }
  172. if (streams.length < 2) {
  173. throw new ERR_MISSING_ARGS('streams')
  174. }
  175. const ac = new AbortController()
  176. const signal = ac.signal
  177. const outerSignal = opts === null || opts === undefined ? undefined : opts.signal
  178. // Need to cleanup event listeners if last stream is readable
  179. // https://github.com/nodejs/node/issues/35452
  180. const lastStreamCleanup = []
  181. validateAbortSignal(outerSignal, 'options.signal')
  182. function abort() {
  183. finishImpl(new AbortError())
  184. }
  185. addAbortListener = addAbortListener || require('../../ours/util').addAbortListener
  186. let disposable
  187. if (outerSignal) {
  188. disposable = addAbortListener(outerSignal, abort)
  189. }
  190. let error
  191. let value
  192. const destroys = []
  193. let finishCount = 0
  194. function finish(err) {
  195. finishImpl(err, --finishCount === 0)
  196. }
  197. function finishImpl(err, final) {
  198. var _disposable
  199. if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
  200. error = err
  201. }
  202. if (!error && !final) {
  203. return
  204. }
  205. while (destroys.length) {
  206. destroys.shift()(error)
  207. }
  208. ;(_disposable = disposable) === null || _disposable === undefined ? undefined : _disposable[SymbolDispose]()
  209. ac.abort()
  210. if (final) {
  211. if (!error) {
  212. lastStreamCleanup.forEach((fn) => fn())
  213. }
  214. process.nextTick(callback, error, value)
  215. }
  216. }
  217. let ret
  218. for (let i = 0; i < streams.length; i++) {
  219. const stream = streams[i]
  220. const reading = i < streams.length - 1
  221. const writing = i > 0
  222. const end = reading || (opts === null || opts === undefined ? undefined : opts.end) !== false
  223. const isLastStream = i === streams.length - 1
  224. if (isNodeStream(stream)) {
  225. if (end) {
  226. const { destroy, cleanup } = destroyer(stream, reading, writing)
  227. destroys.push(destroy)
  228. if (isReadable(stream) && isLastStream) {
  229. lastStreamCleanup.push(cleanup)
  230. }
  231. }
  232. // Catch stream errors that occur after pipe/pump has completed.
  233. function onError(err) {
  234. if (err && err.name !== 'AbortError' && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
  235. finish(err)
  236. }
  237. }
  238. stream.on('error', onError)
  239. if (isReadable(stream) && isLastStream) {
  240. lastStreamCleanup.push(() => {
  241. stream.removeListener('error', onError)
  242. })
  243. }
  244. }
  245. if (i === 0) {
  246. if (typeof stream === 'function') {
  247. ret = stream({
  248. signal
  249. })
  250. if (!isIterable(ret)) {
  251. throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or Stream', 'source', ret)
  252. }
  253. } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
  254. ret = stream
  255. } else {
  256. ret = Duplex.from(stream)
  257. }
  258. } else if (typeof stream === 'function') {
  259. if (isTransformStream(ret)) {
  260. var _ret
  261. ret = makeAsyncIterable((_ret = ret) === null || _ret === undefined ? undefined : _ret.readable)
  262. } else {
  263. ret = makeAsyncIterable(ret)
  264. }
  265. ret = stream(ret, {
  266. signal
  267. })
  268. if (reading) {
  269. if (!isIterable(ret, true)) {
  270. throw new ERR_INVALID_RETURN_VALUE('AsyncIterable', `transform[${i - 1}]`, ret)
  271. }
  272. } else {
  273. var _ret2
  274. if (!PassThrough) {
  275. PassThrough = require('./passthrough')
  276. }
  277. // If the last argument to pipeline is not a stream
  278. // we must create a proxy stream so that pipeline(...)
  279. // always returns a stream which can be further
  280. // composed through `.pipe(stream)`.
  281. const pt = new PassThrough({
  282. objectMode: true
  283. })
  284. // Handle Promises/A+ spec, `then` could be a getter that throws on
  285. // second use.
  286. const then = (_ret2 = ret) === null || _ret2 === undefined ? undefined : _ret2.then
  287. if (typeof then === 'function') {
  288. finishCount++
  289. then.call(
  290. ret,
  291. (val) => {
  292. value = val
  293. if (val != null) {
  294. pt.write(val)
  295. }
  296. if (end) {
  297. pt.end()
  298. }
  299. process.nextTick(finish)
  300. },
  301. (err) => {
  302. pt.destroy(err)
  303. process.nextTick(finish, err)
  304. }
  305. )
  306. } else if (isIterable(ret, true)) {
  307. finishCount++
  308. pumpToNode(ret, pt, finish, {
  309. end
  310. })
  311. } else if (isReadableStream(ret) || isTransformStream(ret)) {
  312. const toRead = ret.readable || ret
  313. finishCount++
  314. pumpToNode(toRead, pt, finish, {
  315. end
  316. })
  317. } else {
  318. throw new ERR_INVALID_RETURN_VALUE('AsyncIterable or Promise', 'destination', ret)
  319. }
  320. ret = pt
  321. const { destroy, cleanup } = destroyer(ret, false, true)
  322. destroys.push(destroy)
  323. if (isLastStream) {
  324. lastStreamCleanup.push(cleanup)
  325. }
  326. }
  327. } else if (isNodeStream(stream)) {
  328. if (isReadableNodeStream(ret)) {
  329. finishCount += 2
  330. const cleanup = pipe(ret, stream, finish, {
  331. end
  332. })
  333. if (isReadable(stream) && isLastStream) {
  334. lastStreamCleanup.push(cleanup)
  335. }
  336. } else if (isTransformStream(ret) || isReadableStream(ret)) {
  337. const toRead = ret.readable || ret
  338. finishCount++
  339. pumpToNode(toRead, stream, finish, {
  340. end
  341. })
  342. } else if (isIterable(ret)) {
  343. finishCount++
  344. pumpToNode(ret, stream, finish, {
  345. end
  346. })
  347. } else {
  348. throw new ERR_INVALID_ARG_TYPE(
  349. 'val',
  350. ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'],
  351. ret
  352. )
  353. }
  354. ret = stream
  355. } else if (isWebStream(stream)) {
  356. if (isReadableNodeStream(ret)) {
  357. finishCount++
  358. pumpToWeb(makeAsyncIterable(ret), stream, finish, {
  359. end
  360. })
  361. } else if (isReadableStream(ret) || isIterable(ret)) {
  362. finishCount++
  363. pumpToWeb(ret, stream, finish, {
  364. end
  365. })
  366. } else if (isTransformStream(ret)) {
  367. finishCount++
  368. pumpToWeb(ret.readable, stream, finish, {
  369. end
  370. })
  371. } else {
  372. throw new ERR_INVALID_ARG_TYPE(
  373. 'val',
  374. ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'],
  375. ret
  376. )
  377. }
  378. ret = stream
  379. } else {
  380. ret = Duplex.from(stream)
  381. }
  382. }
  383. if (
  384. (signal !== null && signal !== undefined && signal.aborted) ||
  385. (outerSignal !== null && outerSignal !== undefined && outerSignal.aborted)
  386. ) {
  387. process.nextTick(abort)
  388. }
  389. return ret
  390. }
  391. function pipe(src, dst, finish, { end }) {
  392. let ended = false
  393. dst.on('close', () => {
  394. if (!ended) {
  395. // Finish if the destination closes before the source has completed.
  396. finish(new ERR_STREAM_PREMATURE_CLOSE())
  397. }
  398. })
  399. src.pipe(dst, {
  400. end: false
  401. }) // If end is true we already will have a listener to end dst.
  402. if (end) {
  403. // Compat. Before node v10.12.0 stdio used to throw an error so
  404. // pipe() did/does not end() stdio destinations.
  405. // Now they allow it but "secretly" don't close the underlying fd.
  406. function endFn() {
  407. ended = true
  408. dst.end()
  409. }
  410. if (isReadableFinished(src)) {
  411. // End the destination if the source has already ended.
  412. process.nextTick(endFn)
  413. } else {
  414. src.once('end', endFn)
  415. }
  416. } else {
  417. finish()
  418. }
  419. eos(
  420. src,
  421. {
  422. readable: true,
  423. writable: false
  424. },
  425. (err) => {
  426. const rState = src._readableState
  427. if (
  428. err &&
  429. err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
  430. rState &&
  431. rState.ended &&
  432. !rState.errored &&
  433. !rState.errorEmitted
  434. ) {
  435. // Some readable streams will emit 'close' before 'end'. However, since
  436. // this is on the readable side 'end' should still be emitted if the
  437. // stream has been ended and no error emitted. This should be allowed in
  438. // favor of backwards compatibility. Since the stream is piped to a
  439. // destination this should not result in any observable difference.
  440. // We don't need to check if this is a writable premature close since
  441. // eos will only fail with premature close on the reading side for
  442. // duplex streams.
  443. src.once('end', finish).once('error', finish)
  444. } else {
  445. finish(err)
  446. }
  447. }
  448. )
  449. return eos(
  450. dst,
  451. {
  452. readable: false,
  453. writable: true
  454. },
  455. finish
  456. )
  457. }
  458. module.exports = {
  459. pipelineImpl,
  460. pipeline
  461. }