streaming.mjs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. import { ReadableStream } from "./_shims/index.mjs";
  2. import { OpenAIError } from "./error.mjs";
  3. import { findDoubleNewlineIndex, LineDecoder } from "./internal/decoders/line.mjs";
  4. import { ReadableStreamToAsyncIterable } from "./internal/stream-utils.mjs";
  5. import { createResponseHeaders } from "./core.mjs";
  6. import { APIError } from "./error.mjs";
  7. export class Stream {
  8. constructor(iterator, controller) {
  9. this.iterator = iterator;
  10. this.controller = controller;
  11. }
  12. static fromSSEResponse(response, controller) {
  13. let consumed = false;
  14. async function* iterator() {
  15. if (consumed) {
  16. throw new Error('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  17. }
  18. consumed = true;
  19. let done = false;
  20. try {
  21. for await (const sse of _iterSSEMessages(response, controller)) {
  22. if (done)
  23. continue;
  24. if (sse.data.startsWith('[DONE]')) {
  25. done = true;
  26. continue;
  27. }
  28. if (sse.event === null ||
  29. sse.event.startsWith('response.') ||
  30. sse.event.startsWith('transcript.')) {
  31. let data;
  32. try {
  33. data = JSON.parse(sse.data);
  34. }
  35. catch (e) {
  36. console.error(`Could not parse message into JSON:`, sse.data);
  37. console.error(`From chunk:`, sse.raw);
  38. throw e;
  39. }
  40. if (data && data.error) {
  41. throw new APIError(undefined, data.error, undefined, createResponseHeaders(response.headers));
  42. }
  43. yield data;
  44. }
  45. else {
  46. let data;
  47. try {
  48. data = JSON.parse(sse.data);
  49. }
  50. catch (e) {
  51. console.error(`Could not parse message into JSON:`, sse.data);
  52. console.error(`From chunk:`, sse.raw);
  53. throw e;
  54. }
  55. // TODO: Is this where the error should be thrown?
  56. if (sse.event == 'error') {
  57. throw new APIError(undefined, data.error, data.message, undefined);
  58. }
  59. yield { event: sse.event, data: data };
  60. }
  61. }
  62. done = true;
  63. }
  64. catch (e) {
  65. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  66. if (e instanceof Error && e.name === 'AbortError')
  67. return;
  68. throw e;
  69. }
  70. finally {
  71. // If the user `break`s, abort the ongoing request.
  72. if (!done)
  73. controller.abort();
  74. }
  75. }
  76. return new Stream(iterator, controller);
  77. }
  78. /**
  79. * Generates a Stream from a newline-separated ReadableStream
  80. * where each item is a JSON value.
  81. */
  82. static fromReadableStream(readableStream, controller) {
  83. let consumed = false;
  84. async function* iterLines() {
  85. const lineDecoder = new LineDecoder();
  86. const iter = ReadableStreamToAsyncIterable(readableStream);
  87. for await (const chunk of iter) {
  88. for (const line of lineDecoder.decode(chunk)) {
  89. yield line;
  90. }
  91. }
  92. for (const line of lineDecoder.flush()) {
  93. yield line;
  94. }
  95. }
  96. async function* iterator() {
  97. if (consumed) {
  98. throw new Error('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
  99. }
  100. consumed = true;
  101. let done = false;
  102. try {
  103. for await (const line of iterLines()) {
  104. if (done)
  105. continue;
  106. if (line)
  107. yield JSON.parse(line);
  108. }
  109. done = true;
  110. }
  111. catch (e) {
  112. // If the user calls `stream.controller.abort()`, we should exit without throwing.
  113. if (e instanceof Error && e.name === 'AbortError')
  114. return;
  115. throw e;
  116. }
  117. finally {
  118. // If the user `break`s, abort the ongoing request.
  119. if (!done)
  120. controller.abort();
  121. }
  122. }
  123. return new Stream(iterator, controller);
  124. }
  125. [Symbol.asyncIterator]() {
  126. return this.iterator();
  127. }
  128. /**
  129. * Splits the stream into two streams which can be
  130. * independently read from at different speeds.
  131. */
  132. tee() {
  133. const left = [];
  134. const right = [];
  135. const iterator = this.iterator();
  136. const teeIterator = (queue) => {
  137. return {
  138. next: () => {
  139. if (queue.length === 0) {
  140. const result = iterator.next();
  141. left.push(result);
  142. right.push(result);
  143. }
  144. return queue.shift();
  145. },
  146. };
  147. };
  148. return [
  149. new Stream(() => teeIterator(left), this.controller),
  150. new Stream(() => teeIterator(right), this.controller),
  151. ];
  152. }
  153. /**
  154. * Converts this stream to a newline-separated ReadableStream of
  155. * JSON stringified values in the stream
  156. * which can be turned back into a Stream with `Stream.fromReadableStream()`.
  157. */
  158. toReadableStream() {
  159. const self = this;
  160. let iter;
  161. const encoder = new TextEncoder();
  162. return new ReadableStream({
  163. async start() {
  164. iter = self[Symbol.asyncIterator]();
  165. },
  166. async pull(ctrl) {
  167. try {
  168. const { value, done } = await iter.next();
  169. if (done)
  170. return ctrl.close();
  171. const bytes = encoder.encode(JSON.stringify(value) + '\n');
  172. ctrl.enqueue(bytes);
  173. }
  174. catch (err) {
  175. ctrl.error(err);
  176. }
  177. },
  178. async cancel() {
  179. await iter.return?.();
  180. },
  181. });
  182. }
  183. }
  184. export async function* _iterSSEMessages(response, controller) {
  185. if (!response.body) {
  186. controller.abort();
  187. throw new OpenAIError(`Attempted to iterate over a response with no body`);
  188. }
  189. const sseDecoder = new SSEDecoder();
  190. const lineDecoder = new LineDecoder();
  191. const iter = ReadableStreamToAsyncIterable(response.body);
  192. for await (const sseChunk of iterSSEChunks(iter)) {
  193. for (const line of lineDecoder.decode(sseChunk)) {
  194. const sse = sseDecoder.decode(line);
  195. if (sse)
  196. yield sse;
  197. }
  198. }
  199. for (const line of lineDecoder.flush()) {
  200. const sse = sseDecoder.decode(line);
  201. if (sse)
  202. yield sse;
  203. }
  204. }
  205. /**
  206. * Given an async iterable iterator, iterates over it and yields full
  207. * SSE chunks, i.e. yields when a double new-line is encountered.
  208. */
  209. async function* iterSSEChunks(iterator) {
  210. let data = new Uint8Array();
  211. for await (const chunk of iterator) {
  212. if (chunk == null) {
  213. continue;
  214. }
  215. const binaryChunk = chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
  216. : typeof chunk === 'string' ? new TextEncoder().encode(chunk)
  217. : chunk;
  218. let newData = new Uint8Array(data.length + binaryChunk.length);
  219. newData.set(data);
  220. newData.set(binaryChunk, data.length);
  221. data = newData;
  222. let patternIndex;
  223. while ((patternIndex = findDoubleNewlineIndex(data)) !== -1) {
  224. yield data.slice(0, patternIndex);
  225. data = data.slice(patternIndex);
  226. }
  227. }
  228. if (data.length > 0) {
  229. yield data;
  230. }
  231. }
  232. class SSEDecoder {
  233. constructor() {
  234. this.event = null;
  235. this.data = [];
  236. this.chunks = [];
  237. }
  238. decode(line) {
  239. if (line.endsWith('\r')) {
  240. line = line.substring(0, line.length - 1);
  241. }
  242. if (!line) {
  243. // empty line and we didn't previously encounter any messages
  244. if (!this.event && !this.data.length)
  245. return null;
  246. const sse = {
  247. event: this.event,
  248. data: this.data.join('\n'),
  249. raw: this.chunks,
  250. };
  251. this.event = null;
  252. this.data = [];
  253. this.chunks = [];
  254. return sse;
  255. }
  256. this.chunks.push(line);
  257. if (line.startsWith(':')) {
  258. return null;
  259. }
  260. let [fieldname, _, value] = partition(line, ':');
  261. if (value.startsWith(' ')) {
  262. value = value.substring(1);
  263. }
  264. if (fieldname === 'event') {
  265. this.event = value;
  266. }
  267. else if (fieldname === 'data') {
  268. this.data.push(value);
  269. }
  270. return null;
  271. }
  272. }
  273. function partition(str, delimiter) {
  274. const index = str.indexOf(delimiter);
  275. if (index !== -1) {
  276. return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)];
  277. }
  278. return [str, '', ''];
  279. }
  280. //# sourceMappingURL=streaming.mjs.map