duplexify.js 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. /* replacement start */
  2. const process = require('process/')
  3. /* replacement end */
  4. ;('use strict')
  5. const bufferModule = require('buffer')
  6. const {
  7. isReadable,
  8. isWritable,
  9. isIterable,
  10. isNodeStream,
  11. isReadableNodeStream,
  12. isWritableNodeStream,
  13. isDuplexNodeStream,
  14. isReadableStream,
  15. isWritableStream
  16. } = require('./utils')
  17. const eos = require('./end-of-stream')
  18. const {
  19. AbortError,
  20. codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE }
  21. } = require('../../ours/errors')
  22. const { destroyer } = require('./destroy')
  23. const Duplex = require('./duplex')
  24. const Readable = require('./readable')
  25. const Writable = require('./writable')
  26. const { createDeferredPromise } = require('../../ours/util')
  27. const from = require('./from')
  28. const Blob = globalThis.Blob || bufferModule.Blob
  29. const isBlob =
  30. typeof Blob !== 'undefined'
  31. ? function isBlob(b) {
  32. return b instanceof Blob
  33. }
  34. : function isBlob(b) {
  35. return false
  36. }
  37. const AbortController = globalThis.AbortController || require('abort-controller').AbortController
  38. const { FunctionPrototypeCall } = require('../../ours/primordials')
  39. // This is needed for pre node 17.
  40. class Duplexify extends Duplex {
  41. constructor(options) {
  42. super(options)
  43. // https://github.com/nodejs/node/pull/34385
  44. if ((options === null || options === undefined ? undefined : options.readable) === false) {
  45. this._readableState.readable = false
  46. this._readableState.ended = true
  47. this._readableState.endEmitted = true
  48. }
  49. if ((options === null || options === undefined ? undefined : options.writable) === false) {
  50. this._writableState.writable = false
  51. this._writableState.ending = true
  52. this._writableState.ended = true
  53. this._writableState.finished = true
  54. }
  55. }
  56. }
  57. module.exports = function duplexify(body, name) {
  58. if (isDuplexNodeStream(body)) {
  59. return body
  60. }
  61. if (isReadableNodeStream(body)) {
  62. return _duplexify({
  63. readable: body
  64. })
  65. }
  66. if (isWritableNodeStream(body)) {
  67. return _duplexify({
  68. writable: body
  69. })
  70. }
  71. if (isNodeStream(body)) {
  72. return _duplexify({
  73. writable: false,
  74. readable: false
  75. })
  76. }
  77. if (isReadableStream(body)) {
  78. return _duplexify({
  79. readable: Readable.fromWeb(body)
  80. })
  81. }
  82. if (isWritableStream(body)) {
  83. return _duplexify({
  84. writable: Writable.fromWeb(body)
  85. })
  86. }
  87. if (typeof body === 'function') {
  88. const { value, write, final, destroy } = fromAsyncGen(body)
  89. if (isIterable(value)) {
  90. return from(Duplexify, value, {
  91. // TODO (ronag): highWaterMark?
  92. objectMode: true,
  93. write,
  94. final,
  95. destroy
  96. })
  97. }
  98. const then = value === null || value === undefined ? undefined : value.then
  99. if (typeof then === 'function') {
  100. let d
  101. const promise = FunctionPrototypeCall(
  102. then,
  103. value,
  104. (val) => {
  105. if (val != null) {
  106. throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val)
  107. }
  108. },
  109. (err) => {
  110. destroyer(d, err)
  111. }
  112. )
  113. return (d = new Duplexify({
  114. // TODO (ronag): highWaterMark?
  115. objectMode: true,
  116. readable: false,
  117. write,
  118. final(cb) {
  119. final(async () => {
  120. try {
  121. await promise
  122. process.nextTick(cb, null)
  123. } catch (err) {
  124. process.nextTick(cb, err)
  125. }
  126. })
  127. },
  128. destroy
  129. }))
  130. }
  131. throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or AsyncFunction', name, value)
  132. }
  133. if (isBlob(body)) {
  134. return duplexify(body.arrayBuffer())
  135. }
  136. if (isIterable(body)) {
  137. return from(Duplexify, body, {
  138. // TODO (ronag): highWaterMark?
  139. objectMode: true,
  140. writable: false
  141. })
  142. }
  143. if (
  144. isReadableStream(body === null || body === undefined ? undefined : body.readable) &&
  145. isWritableStream(body === null || body === undefined ? undefined : body.writable)
  146. ) {
  147. return Duplexify.fromWeb(body)
  148. }
  149. if (
  150. typeof (body === null || body === undefined ? undefined : body.writable) === 'object' ||
  151. typeof (body === null || body === undefined ? undefined : body.readable) === 'object'
  152. ) {
  153. const readable =
  154. body !== null && body !== undefined && body.readable
  155. ? isReadableNodeStream(body === null || body === undefined ? undefined : body.readable)
  156. ? body === null || body === undefined
  157. ? undefined
  158. : body.readable
  159. : duplexify(body.readable)
  160. : undefined
  161. const writable =
  162. body !== null && body !== undefined && body.writable
  163. ? isWritableNodeStream(body === null || body === undefined ? undefined : body.writable)
  164. ? body === null || body === undefined
  165. ? undefined
  166. : body.writable
  167. : duplexify(body.writable)
  168. : undefined
  169. return _duplexify({
  170. readable,
  171. writable
  172. })
  173. }
  174. const then = body === null || body === undefined ? undefined : body.then
  175. if (typeof then === 'function') {
  176. let d
  177. FunctionPrototypeCall(
  178. then,
  179. body,
  180. (val) => {
  181. if (val != null) {
  182. d.push(val)
  183. }
  184. d.push(null)
  185. },
  186. (err) => {
  187. destroyer(d, err)
  188. }
  189. )
  190. return (d = new Duplexify({
  191. objectMode: true,
  192. writable: false,
  193. read() {}
  194. }))
  195. }
  196. throw new ERR_INVALID_ARG_TYPE(
  197. name,
  198. [
  199. 'Blob',
  200. 'ReadableStream',
  201. 'WritableStream',
  202. 'Stream',
  203. 'Iterable',
  204. 'AsyncIterable',
  205. 'Function',
  206. '{ readable, writable } pair',
  207. 'Promise'
  208. ],
  209. body
  210. )
  211. }
  212. function fromAsyncGen(fn) {
  213. let { promise, resolve } = createDeferredPromise()
  214. const ac = new AbortController()
  215. const signal = ac.signal
  216. const value = fn(
  217. (async function* () {
  218. while (true) {
  219. const _promise = promise
  220. promise = null
  221. const { chunk, done, cb } = await _promise
  222. process.nextTick(cb)
  223. if (done) return
  224. if (signal.aborted)
  225. throw new AbortError(undefined, {
  226. cause: signal.reason
  227. })
  228. ;({ promise, resolve } = createDeferredPromise())
  229. yield chunk
  230. }
  231. })(),
  232. {
  233. signal
  234. }
  235. )
  236. return {
  237. value,
  238. write(chunk, encoding, cb) {
  239. const _resolve = resolve
  240. resolve = null
  241. _resolve({
  242. chunk,
  243. done: false,
  244. cb
  245. })
  246. },
  247. final(cb) {
  248. const _resolve = resolve
  249. resolve = null
  250. _resolve({
  251. done: true,
  252. cb
  253. })
  254. },
  255. destroy(err, cb) {
  256. ac.abort()
  257. cb(err)
  258. }
  259. }
  260. }
  261. function _duplexify(pair) {
  262. const r = pair.readable && typeof pair.readable.read !== 'function' ? Readable.wrap(pair.readable) : pair.readable
  263. const w = pair.writable
  264. let readable = !!isReadable(r)
  265. let writable = !!isWritable(w)
  266. let ondrain
  267. let onfinish
  268. let onreadable
  269. let onclose
  270. let d
  271. function onfinished(err) {
  272. const cb = onclose
  273. onclose = null
  274. if (cb) {
  275. cb(err)
  276. } else if (err) {
  277. d.destroy(err)
  278. }
  279. }
  280. // TODO(ronag): Avoid double buffering.
  281. // Implement Writable/Readable/Duplex traits.
  282. // See, https://github.com/nodejs/node/pull/33515.
  283. d = new Duplexify({
  284. // TODO (ronag): highWaterMark?
  285. readableObjectMode: !!(r !== null && r !== undefined && r.readableObjectMode),
  286. writableObjectMode: !!(w !== null && w !== undefined && w.writableObjectMode),
  287. readable,
  288. writable
  289. })
  290. if (writable) {
  291. eos(w, (err) => {
  292. writable = false
  293. if (err) {
  294. destroyer(r, err)
  295. }
  296. onfinished(err)
  297. })
  298. d._write = function (chunk, encoding, callback) {
  299. if (w.write(chunk, encoding)) {
  300. callback()
  301. } else {
  302. ondrain = callback
  303. }
  304. }
  305. d._final = function (callback) {
  306. w.end()
  307. onfinish = callback
  308. }
  309. w.on('drain', function () {
  310. if (ondrain) {
  311. const cb = ondrain
  312. ondrain = null
  313. cb()
  314. }
  315. })
  316. w.on('finish', function () {
  317. if (onfinish) {
  318. const cb = onfinish
  319. onfinish = null
  320. cb()
  321. }
  322. })
  323. }
  324. if (readable) {
  325. eos(r, (err) => {
  326. readable = false
  327. if (err) {
  328. destroyer(r, err)
  329. }
  330. onfinished(err)
  331. })
  332. r.on('readable', function () {
  333. if (onreadable) {
  334. const cb = onreadable
  335. onreadable = null
  336. cb()
  337. }
  338. })
  339. r.on('end', function () {
  340. d.push(null)
  341. })
  342. d._read = function () {
  343. while (true) {
  344. const buf = r.read()
  345. if (buf === null) {
  346. onreadable = d._read
  347. return
  348. }
  349. if (!d.push(buf)) {
  350. return
  351. }
  352. }
  353. }
  354. }
  355. d._destroy = function (err, callback) {
  356. if (!err && onclose !== null) {
  357. err = new AbortError()
  358. }
  359. onreadable = null
  360. ondrain = null
  361. onfinish = null
  362. if (onclose === null) {
  363. callback(err)
  364. } else {
  365. onclose = callback
  366. destroyer(w, err)
  367. destroyer(r, err)
  368. }
  369. }
  370. return d
  371. }