PullStream.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. const Stream = require('stream');
  2. const util = require('util');
  3. const strFunction = 'function';
  4. function PullStream() {
  5. if (!(this instanceof PullStream))
  6. return new PullStream();
  7. Stream.Duplex.call(this, {decodeStrings:false, objectMode:true});
  8. this.buffer = Buffer.from('');
  9. const self = this;
  10. self.on('finish', function() {
  11. self.finished = true;
  12. self.emit('chunk', false);
  13. });
  14. }
  15. util.inherits(PullStream, Stream.Duplex);
  16. PullStream.prototype._write = function(chunk, e, cb) {
  17. this.buffer = Buffer.concat([this.buffer, chunk]);
  18. this.cb = cb;
  19. this.emit('chunk');
  20. };
  21. // The `eof` parameter is interpreted as `file_length` if the type is number
  22. // otherwise (i.e. buffer) it is interpreted as a pattern signaling end of stream
  23. PullStream.prototype.stream = function(eof, includeEof) {
  24. const p = Stream.PassThrough();
  25. let done;
  26. const self= this;
  27. function cb() {
  28. if (typeof self.cb === strFunction) {
  29. const callback = self.cb;
  30. self.cb = undefined;
  31. return callback();
  32. }
  33. }
  34. function pull() {
  35. let packet;
  36. if (self.buffer && self.buffer.length) {
  37. if (typeof eof === 'number') {
  38. packet = self.buffer.slice(0, eof);
  39. self.buffer = self.buffer.slice(eof);
  40. eof -= packet.length;
  41. done = done || !eof;
  42. } else {
  43. let match = self.buffer.indexOf(eof);
  44. if (match !== -1) {
  45. // store signature match byte offset to allow us to reference
  46. // this for zip64 offset
  47. self.match = match;
  48. if (includeEof) match = match + eof.length;
  49. packet = self.buffer.slice(0, match);
  50. self.buffer = self.buffer.slice(match);
  51. done = true;
  52. } else {
  53. const len = self.buffer.length - eof.length;
  54. if (len <= 0) {
  55. cb();
  56. } else {
  57. packet = self.buffer.slice(0, len);
  58. self.buffer = self.buffer.slice(len);
  59. }
  60. }
  61. }
  62. if (packet) p.write(packet, function() {
  63. if (self.buffer.length === 0 || (eof.length && self.buffer.length <= eof.length)) cb();
  64. });
  65. }
  66. if (!done) {
  67. if (self.finished) {
  68. self.removeListener('chunk', pull);
  69. self.emit('error', new Error('FILE_ENDED'));
  70. return;
  71. }
  72. } else {
  73. self.removeListener('chunk', pull);
  74. p.end();
  75. }
  76. }
  77. self.on('chunk', pull);
  78. pull();
  79. return p;
  80. };
  81. PullStream.prototype.pull = function(eof, includeEof) {
  82. if (eof === 0) return Promise.resolve('');
  83. // If we already have the required data in buffer
  84. // we can resolve the request immediately
  85. if (!isNaN(eof) && this.buffer.length > eof) {
  86. const data = this.buffer.slice(0, eof);
  87. this.buffer = this.buffer.slice(eof);
  88. return Promise.resolve(data);
  89. }
  90. // Otherwise we stream until we have it
  91. let buffer = Buffer.from('');
  92. const self = this;
  93. const concatStream = new Stream.Transform();
  94. concatStream._transform = function(d, e, cb) {
  95. buffer = Buffer.concat([buffer, d]);
  96. cb();
  97. };
  98. let rejectHandler;
  99. let pullStreamRejectHandler;
  100. return new Promise(function(resolve, reject) {
  101. rejectHandler = reject;
  102. pullStreamRejectHandler = function(e) {
  103. self.__emittedError = e;
  104. reject(e);
  105. };
  106. if (self.finished)
  107. return reject(new Error('FILE_ENDED'));
  108. self.once('error', pullStreamRejectHandler); // reject any errors from pullstream itself
  109. self.stream(eof, includeEof)
  110. .on('error', reject)
  111. .pipe(concatStream)
  112. .on('finish', function() {resolve(buffer);})
  113. .on('error', reject);
  114. })
  115. .finally(function() {
  116. self.removeListener('error', rejectHandler);
  117. self.removeListener('error', pullStreamRejectHandler);
  118. });
  119. };
  120. PullStream.prototype._read = function(){};
  121. module.exports = PullStream;