streaming.js 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports._iterSSEMessages = exports.Stream = void 0;
  4. const index_1 = require("./_shims/index.js");
  5. const error_1 = require("./error.js");
  6. const line_1 = require("./internal/decoders/line.js");
  7. const stream_utils_1 = require("./internal/stream-utils.js");
  8. const core_1 = require("./core.js");
  9. const error_2 = require("./error.js");
  10. class Stream {
  11. constructor(iterator, controller) {
  12. this.iterator = iterator;
  13. this.controller = controller;
  14. }
  15. static fromSSEResponse(response, controller) {
  16. let consumed = false;
  17. async function* iterator() {
  18. if (consumed) {
  19. throw new Error('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  20. }
  21. consumed = true;
  22. let done = false;
  23. try {
  24. for await (const sse of _iterSSEMessages(response, controller)) {
  25. if (done)
  26. continue;
  27. if (sse.data.startsWith('[DONE]')) {
  28. done = true;
  29. continue;
  30. }
  31. if (sse.event === null ||
  32. sse.event.startsWith('response.') ||
  33. sse.event.startsWith('transcript.')) {
  34. let data;
  35. try {
  36. data = JSON.parse(sse.data);
  37. }
  38. catch (e) {
  39. console.error(`Could not parse message into JSON:`, sse.data);
  40. console.error(`From chunk:`, sse.raw);
  41. throw e;
  42. }
  43. if (data && data.error) {
  44. throw new error_2.APIError(undefined, data.error, undefined, (0, core_1.createResponseHeaders)(response.headers));
  45. }
  46. yield data;
  47. }
  48. else {
  49. let data;
  50. try {
  51. data = JSON.parse(sse.data);
  52. }
  53. catch (e) {
  54. console.error(`Could not parse message into JSON:`, sse.data);
  55. console.error(`From chunk:`, sse.raw);
  56. throw e;
  57. }
  58. // TODO: Is this where the error should be thrown?
  59. if (sse.event == 'error') {
  60. throw new error_2.APIError(undefined, data.error, data.message, undefined);
  61. }
  62. yield { event: sse.event, data: data };
  63. }
  64. }
  65. done = true;
  66. }
  67. catch (e) {
  68. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  69. if (e instanceof Error && e.name === 'AbortError')
  70. return;
  71. throw e;
  72. }
  73. finally {
  74. // If the user `break`s, abort the ongoing request.
  75. if (!done)
  76. controller.abort();
  77. }
  78. }
  79. return new Stream(iterator, controller);
  80. }
  81. /**
  82. * Generates a Stream from a newline-separated ReadableStream
  83. * where each item is a JSON value.
  84. */
  85. static fromReadableStream(readableStream, controller) {
  86. let consumed = false;
  87. async function* iterLines() {
  88. const lineDecoder = new line_1.LineDecoder();
  89. const iter = (0, stream_utils_1.ReadableStreamToAsyncIterable)(readableStream);
  90. for await (const chunk of iter) {
  91. for (const line of lineDecoder.decode(chunk)) {
  92. yield line;
  93. }
  94. }
  95. for (const line of lineDecoder.flush()) {
  96. yield line;
  97. }
  98. }
  99. async function* iterator() {
  100. if (consumed) {
  101. throw new Error('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  102. }
  103. consumed = true;
  104. let done = false;
  105. try {
  106. for await (const line of iterLines()) {
  107. if (done)
  108. continue;
  109. if (line)
  110. yield JSON.parse(line);
  111. }
  112. done = true;
  113. }
  114. catch (e) {
  115. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  116. if (e instanceof Error && e.name === 'AbortError')
  117. return;
  118. throw e;
  119. }
  120. finally {
  121. // If the user `break`s, abort the ongoing request.
  122. if (!done)
  123. controller.abort();
  124. }
  125. }
  126. return new Stream(iterator, controller);
  127. }
  128. [Symbol.asyncIterator]() {
  129. return this.iterator();
  130. }
  131. /**
  132. * Splits the stream into two streams which can be
  133. * independently read from at different speeds.
  134. */
  135. tee() {
  136. const left = [];
  137. const right = [];
  138. const iterator = this.iterator();
  139. const teeIterator = (queue) => {
  140. return {
  141. next: () => {
  142. if (queue.length === 0) {
  143. const result = iterator.next();
  144. left.push(result);
  145. right.push(result);
  146. }
  147. return queue.shift();
  148. },
  149. };
  150. };
  151. return [
  152. new Stream(() => teeIterator(left), this.controller),
  153. new Stream(() => teeIterator(right), this.controller),
  154. ];
  155. }
  156. /**
  157. * Converts this stream to a newline-separated ReadableStream of
  158. * JSON stringified values in the stream
  159. * which can be turned back into a Stream with `Stream.fromReadableStream()`.
  160. */
  161. toReadableStream() {
  162. const self = this;
  163. let iter;
  164. const encoder = new TextEncoder();
  165. return new index_1.ReadableStream({
  166. async start() {
  167. iter = self[Symbol.asyncIterator]();
  168. },
  169. async pull(ctrl) {
  170. try {
  171. const { value, done } = await iter.next();
  172. if (done)
  173. return ctrl.close();
  174. const bytes = encoder.encode(JSON.stringify(value) + '\n');
  175. ctrl.enqueue(bytes);
  176. }
  177. catch (err) {
  178. ctrl.error(err);
  179. }
  180. },
  181. async cancel() {
  182. await iter.return?.();
  183. },
  184. });
  185. }
  186. }
  187. exports.Stream = Stream;
  188. async function* _iterSSEMessages(response, controller) {
  189. if (!response.body) {
  190. controller.abort();
  191. throw new error_1.OpenAIError(`Attempted to iterate over a response with no body`);
  192. }
  193. const sseDecoder = new SSEDecoder();
  194. const lineDecoder = new line_1.LineDecoder();
  195. const iter = (0, stream_utils_1.ReadableStreamToAsyncIterable)(response.body);
  196. for await (const sseChunk of iterSSEChunks(iter)) {
  197. for (const line of lineDecoder.decode(sseChunk)) {
  198. const sse = sseDecoder.decode(line);
  199. if (sse)
  200. yield sse;
  201. }
  202. }
  203. for (const line of lineDecoder.flush()) {
  204. const sse = sseDecoder.decode(line);
  205. if (sse)
  206. yield sse;
  207. }
  208. }
  209. exports._iterSSEMessages = _iterSSEMessages;
  210. /**
  211. * Given an async iterable iterator, iterates over it and yields full
  212. * SSE chunks, i.e. yields when a double new-line is encountered.
  213. */
  214. async function* iterSSEChunks(iterator) {
  215. let data = new Uint8Array();
  216. for await (const chunk of iterator) {
  217. if (chunk == null) {
  218. continue;
  219. }
  220. const binaryChunk = chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
  221. : typeof chunk === 'string' ? new TextEncoder().encode(chunk)
  222. : chunk;
  223. let newData = new Uint8Array(data.length + binaryChunk.length);
  224. newData.set(data);
  225. newData.set(binaryChunk, data.length);
  226. data = newData;
  227. let patternIndex;
  228. while ((patternIndex = (0, line_1.findDoubleNewlineIndex)(data)) !== -1) {
  229. yield data.slice(0, patternIndex);
  230. data = data.slice(patternIndex);
  231. }
  232. }
  233. if (data.length > 0) {
  234. yield data;
  235. }
  236. }
  237. class SSEDecoder {
  238. constructor() {
  239. this.event = null;
  240. this.data = [];
  241. this.chunks = [];
  242. }
  243. decode(line) {
  244. if (line.endsWith('\r')) {
  245. line = line.substring(0, line.length - 1);
  246. }
  247. if (!line) {
  248. // empty line and we didn't previously encounter any messages
  249. if (!this.event && !this.data.length)
  250. return null;
  251. const sse = {
  252. event: this.event,
  253. data: this.data.join('\n'),
  254. raw: this.chunks,
  255. };
  256. this.event = null;
  257. this.data = [];
  258. this.chunks = [];
  259. return sse;
  260. }
  261. this.chunks.push(line);
  262. if (line.startsWith(':')) {
  263. return null;
  264. }
  265. let [fieldname, _, value] = partition(line, ':');
  266. if (value.startsWith(' ')) {
  267. value = value.substring(1);
  268. }
  269. if (fieldname === 'event') {
  270. this.event = value;
  271. }
  272. else if (fieldname === 'data') {
  273. this.data.push(value);
  274. }
  275. return null;
  276. }
  277. }
  278. function partition(str, delimiter) {
  279. const index = str.indexOf(delimiter);
  280. if (index !== -1) {
  281. return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)];
  282. }
  283. return [str, '', ''];
  284. }
  285. //# sourceMappingURL=streaming.js.map