| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports._iterSSEMessages = exports.Stream = void 0;
- const index_1 = require("./_shims/index.js");
- const error_1 = require("./error.js");
- const line_1 = require("./internal/decoders/line.js");
- const stream_utils_1 = require("./internal/stream-utils.js");
- const core_1 = require("./core.js");
- const error_2 = require("./error.js");
- class Stream {
- constructor(iterator, controller) {
- this.iterator = iterator;
- this.controller = controller;
- }
- static fromSSEResponse(response, controller) {
- let consumed = false;
- async function* iterator() {
- if (consumed) {
- throw new Error('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
- }
- consumed = true;
- let done = false;
- try {
- for await (const sse of _iterSSEMessages(response, controller)) {
- if (done)
- continue;
- if (sse.data.startsWith('[DONE]')) {
- done = true;
- continue;
- }
- if (sse.event === null ||
- sse.event.startsWith('response.') ||
- sse.event.startsWith('transcript.')) {
- let data;
- try {
- data = JSON.parse(sse.data);
- }
- catch (e) {
- console.error(`Could not parse message into JSON:`, sse.data);
- console.error(`From chunk:`, sse.raw);
- throw e;
- }
- if (data && data.error) {
- throw new error_2.APIError(undefined, data.error, undefined, (0, core_1.createResponseHeaders)(response.headers));
- }
- yield data;
- }
- else {
- let data;
- try {
- data = JSON.parse(sse.data);
- }
- catch (e) {
- console.error(`Could not parse message into JSON:`, sse.data);
- console.error(`From chunk:`, sse.raw);
- throw e;
- }
- // TODO: Is this where the error should be thrown?
- if (sse.event == 'error') {
- throw new error_2.APIError(undefined, data.error, data.message, undefined);
- }
- yield { event: sse.event, data: data };
- }
- }
- done = true;
- }
- catch (e) {
- // If the user calls `stream.controller.abort()`, we should exit without throwing.
- if (e instanceof Error && e.name === 'AbortError')
- return;
- throw e;
- }
- finally {
- // If the user `break`s, abort the ongoing request.
- if (!done)
- controller.abort();
- }
- }
- return new Stream(iterator, controller);
- }
- /**
- * Generates a Stream from a newline-separated ReadableStream
- * where each item is a JSON value.
- */
- static fromReadableStream(readableStream, controller) {
- let consumed = false;
- async function* iterLines() {
- const lineDecoder = new line_1.LineDecoder();
- const iter = (0, stream_utils_1.ReadableStreamToAsyncIterable)(readableStream);
- for await (const chunk of iter) {
- for (const line of lineDecoder.decode(chunk)) {
- yield line;
- }
- }
- for (const line of lineDecoder.flush()) {
- yield line;
- }
- }
- async function* iterator() {
- if (consumed) {
- throw new Error('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
- }
- consumed = true;
- let done = false;
- try {
- for await (const line of iterLines()) {
- if (done)
- continue;
- if (line)
- yield JSON.parse(line);
- }
- done = true;
- }
- catch (e) {
- // If the user calls `stream.controller.abort()`, we should exit without throwing.
- if (e instanceof Error && e.name === 'AbortError')
- return;
- throw e;
- }
- finally {
- // If the user `break`s, abort the ongoing request.
- if (!done)
- controller.abort();
- }
- }
- return new Stream(iterator, controller);
- }
- [Symbol.asyncIterator]() {
- return this.iterator();
- }
- /**
- * Splits the stream into two streams which can be
- * independently read from at different speeds.
- */
- tee() {
- const left = [];
- const right = [];
- const iterator = this.iterator();
- const teeIterator = (queue) => {
- return {
- next: () => {
- if (queue.length === 0) {
- const result = iterator.next();
- left.push(result);
- right.push(result);
- }
- return queue.shift();
- },
- };
- };
- return [
- new Stream(() => teeIterator(left), this.controller),
- new Stream(() => teeIterator(right), this.controller),
- ];
- }
- /**
- * Converts this stream to a newline-separated ReadableStream of
- * JSON stringified values in the stream
- * which can be turned back into a Stream with `Stream.fromReadableStream()`.
- */
- toReadableStream() {
- const self = this;
- let iter;
- const encoder = new TextEncoder();
- return new index_1.ReadableStream({
- async start() {
- iter = self[Symbol.asyncIterator]();
- },
- async pull(ctrl) {
- try {
- const { value, done } = await iter.next();
- if (done)
- return ctrl.close();
- const bytes = encoder.encode(JSON.stringify(value) + '\n');
- ctrl.enqueue(bytes);
- }
- catch (err) {
- ctrl.error(err);
- }
- },
- async cancel() {
- await iter.return?.();
- },
- });
- }
- }
- exports.Stream = Stream;
- async function* _iterSSEMessages(response, controller) {
- if (!response.body) {
- controller.abort();
- throw new error_1.OpenAIError(`Attempted to iterate over a response with no body`);
- }
- const sseDecoder = new SSEDecoder();
- const lineDecoder = new line_1.LineDecoder();
- const iter = (0, stream_utils_1.ReadableStreamToAsyncIterable)(response.body);
- for await (const sseChunk of iterSSEChunks(iter)) {
- for (const line of lineDecoder.decode(sseChunk)) {
- const sse = sseDecoder.decode(line);
- if (sse)
- yield sse;
- }
- }
- for (const line of lineDecoder.flush()) {
- const sse = sseDecoder.decode(line);
- if (sse)
- yield sse;
- }
- }
- exports._iterSSEMessages = _iterSSEMessages;
- /**
- * Given an async iterable iterator, iterates over it and yields full
- * SSE chunks, i.e. yields when a double new-line is encountered.
- */
- async function* iterSSEChunks(iterator) {
- let data = new Uint8Array();
- for await (const chunk of iterator) {
- if (chunk == null) {
- continue;
- }
- const binaryChunk = chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
- : typeof chunk === 'string' ? new TextEncoder().encode(chunk)
- : chunk;
- let newData = new Uint8Array(data.length + binaryChunk.length);
- newData.set(data);
- newData.set(binaryChunk, data.length);
- data = newData;
- let patternIndex;
- while ((patternIndex = (0, line_1.findDoubleNewlineIndex)(data)) !== -1) {
- yield data.slice(0, patternIndex);
- data = data.slice(patternIndex);
- }
- }
- if (data.length > 0) {
- yield data;
- }
- }
- class SSEDecoder {
- constructor() {
- this.event = null;
- this.data = [];
- this.chunks = [];
- }
- decode(line) {
- if (line.endsWith('\r')) {
- line = line.substring(0, line.length - 1);
- }
- if (!line) {
- // empty line and we didn't previously encounter any messages
- if (!this.event && !this.data.length)
- return null;
- const sse = {
- event: this.event,
- data: this.data.join('\n'),
- raw: this.chunks,
- };
- this.event = null;
- this.data = [];
- this.chunks = [];
- return sse;
- }
- this.chunks.push(line);
- if (line.startsWith(':')) {
- return null;
- }
- let [fieldname, _, value] = partition(line, ':');
- if (value.startsWith(' ')) {
- value = value.substring(1);
- }
- if (fieldname === 'event') {
- this.event = value;
- }
- else if (fieldname === 'data') {
- this.data.push(value);
- }
- return null;
- }
- }
- function partition(str, delimiter) {
- const index = str.indexOf(delimiter);
- if (index !== -1) {
- return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)];
- }
- return [str, '', ''];
- }
- //# sourceMappingURL=streaming.js.map
|