from.js 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. 'use strict'
  2. /* replacement start */
  3. const process = require('process/')
  4. /* replacement end */
  5. const { PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials')
  6. const { Buffer } = require('buffer')
  7. const { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } = require('../../ours/errors').codes
  8. function from(Readable, iterable, opts) {
  9. let iterator
  10. if (typeof iterable === 'string' || iterable instanceof Buffer) {
  11. return new Readable({
  12. objectMode: true,
  13. ...opts,
  14. read() {
  15. this.push(iterable)
  16. this.push(null)
  17. }
  18. })
  19. }
  20. let isAsync
  21. if (iterable && iterable[SymbolAsyncIterator]) {
  22. isAsync = true
  23. iterator = iterable[SymbolAsyncIterator]()
  24. } else if (iterable && iterable[SymbolIterator]) {
  25. isAsync = false
  26. iterator = iterable[SymbolIterator]()
  27. } else {
  28. throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable)
  29. }
  30. const readable = new Readable({
  31. objectMode: true,
  32. highWaterMark: 1,
  33. // TODO(ronag): What options should be allowed?
  34. ...opts
  35. })
  36. // Flag to protect against _read
  37. // being called before last iteration completion.
  38. let reading = false
  39. readable._read = function () {
  40. if (!reading) {
  41. reading = true
  42. next()
  43. }
  44. }
  45. readable._destroy = function (error, cb) {
  46. PromisePrototypeThen(
  47. close(error),
  48. () => process.nextTick(cb, error),
  49. // nextTick is here in case cb throws
  50. (e) => process.nextTick(cb, e || error)
  51. )
  52. }
  53. async function close(error) {
  54. const hadError = error !== undefined && error !== null
  55. const hasThrow = typeof iterator.throw === 'function'
  56. if (hadError && hasThrow) {
  57. const { value, done } = await iterator.throw(error)
  58. await value
  59. if (done) {
  60. return
  61. }
  62. }
  63. if (typeof iterator.return === 'function') {
  64. const { value } = await iterator.return()
  65. await value
  66. }
  67. }
  68. async function next() {
  69. for (;;) {
  70. try {
  71. const { value, done } = isAsync ? await iterator.next() : iterator.next()
  72. if (done) {
  73. readable.push(null)
  74. } else {
  75. const res = value && typeof value.then === 'function' ? await value : value
  76. if (res === null) {
  77. reading = false
  78. throw new ERR_STREAM_NULL_VALUES()
  79. } else if (readable.push(res)) {
  80. continue
  81. } else {
  82. reading = false
  83. }
  84. }
  85. } catch (err) {
  86. readable.destroy(err)
  87. }
  88. break
  89. }
  90. }
  91. return readable
  92. }
  93. module.exports = from