| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350 |
- import sys
- import json
- import time
- from functools import wraps
- from collections.abc import Iterable
- import sentry_sdk
- from sentry_sdk import consts
- from sentry_sdk.ai.monitoring import record_token_usage
- from sentry_sdk.ai.utils import (
- set_data_normalized,
- normalize_message_roles,
- truncate_and_annotate_messages,
- truncate_and_annotate_embedding_inputs,
- )
- from sentry_sdk.ai._openai_completions_api import (
- _is_system_instruction as _is_system_instruction_completions,
- _get_system_instructions as _get_system_instructions_completions,
- _transform_system_instructions,
- _get_text_items,
- )
- from sentry_sdk.ai._openai_responses_api import (
- _is_system_instruction as _is_system_instruction_responses,
- _get_system_instructions as _get_system_instructions_responses,
- )
- from sentry_sdk.consts import SPANDATA
- from sentry_sdk.integrations import DidNotEnable, Integration
- from sentry_sdk.scope import should_send_default_pii
- from sentry_sdk.tracing_utils import set_span_errored
- from sentry_sdk.utils import (
- capture_internal_exceptions,
- event_from_exception,
- safe_serialize,
- reraise,
- )
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from typing import (
- Any,
- List,
- Optional,
- Callable,
- AsyncIterator,
- Iterator,
- Union,
- Iterable,
- )
- from sentry_sdk.tracing import Span
- from sentry_sdk._types import TextPart
- from openai.types.responses.response_usage import ResponseUsage
- from openai.types.responses import (
- ResponseInputParam,
- SequenceNotStr,
- ResponseStreamEvent,
- )
- from openai.types import CompletionUsage
- from openai import Omit
- try:
- try:
- from openai import NotGiven
- except ImportError:
- NotGiven = None
- try:
- from openai import Omit
- except ImportError:
- Omit = None
- from openai.resources.chat.completions import Completions, AsyncCompletions
- from openai.resources import Embeddings, AsyncEmbeddings
- from openai import Stream, AsyncStream
- if TYPE_CHECKING:
- from openai.types.chat import (
- ChatCompletionMessageParam,
- ChatCompletionChunk,
- )
- except ImportError:
- raise DidNotEnable("OpenAI not installed")
- RESPONSES_API_ENABLED = True
- try:
- # responses API support was introduced in v1.66.0
- from openai.resources.responses import Responses, AsyncResponses
- from openai.types.responses.response_completed_event import ResponseCompletedEvent
- except ImportError:
- RESPONSES_API_ENABLED = False
- class OpenAIIntegration(Integration):
- identifier = "openai"
- origin = f"auto.ai.{identifier}"
- def __init__(
- self: "OpenAIIntegration",
- include_prompts: bool = True,
- tiktoken_encoding_name: "Optional[str]" = None,
- ) -> None:
- self.include_prompts = include_prompts
- self.tiktoken_encoding = None
- if tiktoken_encoding_name is not None:
- import tiktoken # type: ignore
- self.tiktoken_encoding = tiktoken.get_encoding(tiktoken_encoding_name)
- @staticmethod
- def setup_once() -> None:
- Completions.create = _wrap_chat_completion_create(Completions.create)
- AsyncCompletions.create = _wrap_async_chat_completion_create(
- AsyncCompletions.create
- )
- Embeddings.create = _wrap_embeddings_create(Embeddings.create)
- AsyncEmbeddings.create = _wrap_async_embeddings_create(AsyncEmbeddings.create)
- if RESPONSES_API_ENABLED:
- Responses.create = _wrap_responses_create(Responses.create)
- AsyncResponses.create = _wrap_async_responses_create(AsyncResponses.create)
- def count_tokens(self: "OpenAIIntegration", s: str) -> int:
- if self.tiktoken_encoding is None:
- return 0
- try:
- return len(self.tiktoken_encoding.encode_ordinary(s))
- except Exception:
- return 0
- def _capture_exception(exc: "Any", manual_span_cleanup: bool = True) -> None:
- # Close an eventually open span
- # We need to do this by hand because we are not using the start_span context manager
- current_span = sentry_sdk.get_current_span()
- set_span_errored(current_span)
- if manual_span_cleanup and current_span is not None:
- current_span.__exit__(None, None, None)
- event, hint = event_from_exception(
- exc,
- client_options=sentry_sdk.get_client().options,
- mechanism={"type": "openai", "handled": False},
- )
- sentry_sdk.capture_event(event, hint=hint)
- def _has_attr_and_is_int(
- token_usage: "Union[CompletionUsage, ResponseUsage]", attr_name: str
- ) -> bool:
- return hasattr(token_usage, attr_name) and isinstance(
- getattr(token_usage, attr_name, None), int
- )
- def _calculate_completions_token_usage(
- messages: "Optional[Iterable[ChatCompletionMessageParam]]",
- response: "Any",
- span: "Span",
- streaming_message_responses: "Optional[List[str]]",
- streaming_message_total_token_usage: "Optional[CompletionUsage]",
- count_tokens: "Callable[..., Any]",
- ) -> None:
- """Extract and record token usage from a Chat Completions API response."""
- input_tokens: "Optional[int]" = 0
- input_tokens_cached: "Optional[int]" = 0
- output_tokens: "Optional[int]" = 0
- output_tokens_reasoning: "Optional[int]" = 0
- total_tokens: "Optional[int]" = 0
- usage = None
- if streaming_message_total_token_usage is not None:
- usage = streaming_message_total_token_usage
- elif hasattr(response, "usage"):
- usage = response.usage
- if usage is not None:
- if _has_attr_and_is_int(usage, "prompt_tokens"):
- input_tokens = usage.prompt_tokens
- if _has_attr_and_is_int(usage, "completion_tokens"):
- output_tokens = usage.completion_tokens
- if _has_attr_and_is_int(usage, "total_tokens"):
- total_tokens = usage.total_tokens
- if hasattr(usage, "prompt_tokens_details"):
- cached = getattr(usage.prompt_tokens_details, "cached_tokens", None)
- if isinstance(cached, int):
- input_tokens_cached = cached
- if hasattr(usage, "completion_tokens_details"):
- reasoning = getattr(
- usage.completion_tokens_details, "reasoning_tokens", None
- )
- if isinstance(reasoning, int):
- output_tokens_reasoning = reasoning
- # Manually count input tokens
- if input_tokens == 0:
- for message in messages or []:
- if isinstance(message, str):
- input_tokens += count_tokens(message)
- continue
- elif isinstance(message, dict):
- message_content = message.get("content")
- if message_content is None:
- continue
- text_items = _get_text_items(message_content)
- input_tokens += sum(count_tokens(text) for text in text_items)
- continue
- # Manually count output tokens
- if output_tokens == 0:
- if streaming_message_responses is not None:
- for message in streaming_message_responses:
- output_tokens += count_tokens(message)
- elif hasattr(response, "choices"):
- for choice in response.choices:
- if hasattr(choice, "message") and hasattr(choice.message, "content"):
- output_tokens += count_tokens(choice.message.content)
- # Do not set token data if it is 0
- input_tokens = input_tokens or None
- input_tokens_cached = input_tokens_cached or None
- output_tokens = output_tokens or None
- output_tokens_reasoning = output_tokens_reasoning or None
- total_tokens = total_tokens or None
- record_token_usage(
- span,
- input_tokens=input_tokens,
- input_tokens_cached=input_tokens_cached,
- output_tokens=output_tokens,
- output_tokens_reasoning=output_tokens_reasoning,
- total_tokens=total_tokens,
- )
- def _calculate_responses_token_usage(
- input: "Any",
- response: "Any",
- span: "Span",
- streaming_message_responses: "Optional[List[str]]",
- count_tokens: "Callable[..., Any]",
- ) -> None:
- """Extract and record token usage from a Responses API response."""
- input_tokens: "Optional[int]" = 0
- input_tokens_cached: "Optional[int]" = 0
- output_tokens: "Optional[int]" = 0
- output_tokens_reasoning: "Optional[int]" = 0
- total_tokens: "Optional[int]" = 0
- if hasattr(response, "usage"):
- usage = response.usage
- if _has_attr_and_is_int(usage, "input_tokens"):
- input_tokens = usage.input_tokens
- if _has_attr_and_is_int(usage, "output_tokens"):
- output_tokens = usage.output_tokens
- if _has_attr_and_is_int(usage, "total_tokens"):
- total_tokens = usage.total_tokens
- if hasattr(usage, "input_tokens_details"):
- cached = getattr(usage.input_tokens_details, "cached_tokens", None)
- if isinstance(cached, int):
- input_tokens_cached = cached
- if hasattr(usage, "output_tokens_details"):
- reasoning = getattr(usage.output_tokens_details, "reasoning_tokens", None)
- if isinstance(reasoning, int):
- output_tokens_reasoning = reasoning
- # Manually count input tokens
- if input_tokens == 0:
- for message in input or []:
- if isinstance(message, str):
- input_tokens += count_tokens(message)
- continue
- elif isinstance(message, dict):
- message_content = message.get("content")
- if message_content is None:
- continue
- # Deliberate use of Completions function for both Completions and Responses input format.
- text_items = _get_text_items(message_content)
- input_tokens += sum(count_tokens(text) for text in text_items)
- continue
- # Manually count output tokens
- if output_tokens == 0:
- if streaming_message_responses is not None:
- for message in streaming_message_responses:
- output_tokens += count_tokens(message)
- elif hasattr(response, "output"):
- for output_item in response.output:
- if hasattr(output_item, "content"):
- for content_item in output_item.content:
- if hasattr(content_item, "text"):
- output_tokens += count_tokens(content_item.text)
- # Do not set token data if it is 0
- input_tokens = input_tokens or None
- input_tokens_cached = input_tokens_cached or None
- output_tokens = output_tokens or None
- output_tokens_reasoning = output_tokens_reasoning or None
- total_tokens = total_tokens or None
- record_token_usage(
- span,
- input_tokens=input_tokens,
- input_tokens_cached=input_tokens_cached,
- output_tokens=output_tokens,
- output_tokens_reasoning=output_tokens_reasoning,
- total_tokens=total_tokens,
- )
- def _set_responses_api_input_data(
- span: "Span",
- kwargs: "dict[str, Any]",
- integration: "OpenAIIntegration",
- ) -> None:
- explicit_instructions: "Union[Optional[str], Omit]" = kwargs.get("instructions")
- messages: "Optional[Union[str, ResponseInputParam]]" = kwargs.get("input")
- tools = kwargs.get("tools")
- if tools is not None and _is_given(tools) and len(tools) > 0:
- set_data_normalized(
- span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools)
- )
- model = kwargs.get("model")
- if model is not None:
- span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model)
- max_tokens = kwargs.get("max_output_tokens")
- if max_tokens is not None and _is_given(max_tokens):
- span.set_data(SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
- temperature = kwargs.get("temperature")
- if temperature is not None and _is_given(temperature):
- span.set_data(SPANDATA.GEN_AI_REQUEST_TEMPERATURE, temperature)
- top_p = kwargs.get("top_p")
- if top_p is not None and _is_given(top_p):
- span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_P, top_p)
- if not should_send_default_pii() or not integration.include_prompts:
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "responses")
- return
- if (
- messages is None
- and explicit_instructions is not None
- and _is_given(explicit_instructions)
- ):
- span.set_data(
- SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
- json.dumps(
- [
- {
- "type": "text",
- "content": explicit_instructions,
- }
- ]
- ),
- )
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "responses")
- return
- if messages is None:
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "responses")
- return
- instructions_text_parts: "list[TextPart]" = []
- if explicit_instructions is not None and _is_given(explicit_instructions):
- instructions_text_parts.append(
- {
- "type": "text",
- "content": explicit_instructions,
- }
- )
- system_instructions = _get_system_instructions_responses(messages)
- # Deliberate use of function accepting completions API type because
- # of shared structure FOR THIS PURPOSE ONLY.
- instructions_text_parts += _transform_system_instructions(system_instructions)
- if len(instructions_text_parts) > 0:
- span.set_data(
- SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
- json.dumps(instructions_text_parts),
- )
- if isinstance(messages, str):
- normalized_messages = normalize_message_roles([messages]) # type: ignore
- scope = sentry_sdk.get_current_scope()
- messages_data = truncate_and_annotate_messages(normalized_messages, span, scope)
- if messages_data is not None:
- set_data_normalized(
- span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False
- )
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "responses")
- return
- non_system_messages = [
- message for message in messages if not _is_system_instruction_responses(message)
- ]
- if len(non_system_messages) > 0:
- normalized_messages = normalize_message_roles(non_system_messages)
- scope = sentry_sdk.get_current_scope()
- messages_data = truncate_and_annotate_messages(normalized_messages, span, scope)
- if messages_data is not None:
- set_data_normalized(
- span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False
- )
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "responses")
- def _set_completions_api_input_data(
- span: "Span",
- kwargs: "dict[str, Any]",
- integration: "OpenAIIntegration",
- ) -> None:
- messages: "Optional[Union[str, Iterable[ChatCompletionMessageParam]]]" = kwargs.get(
- "messages"
- )
- tools = kwargs.get("tools")
- if tools is not None and _is_given(tools) and len(tools) > 0:
- set_data_normalized(
- span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools)
- )
- model = kwargs.get("model")
- if model is not None:
- span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model)
- max_tokens = kwargs.get("max_tokens")
- if max_tokens is not None and _is_given(max_tokens):
- span.set_data(SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
- presence_penalty = kwargs.get("presence_penalty")
- if presence_penalty is not None and _is_given(presence_penalty):
- span.set_data(SPANDATA.GEN_AI_REQUEST_PRESENCE_PENALTY, presence_penalty)
- frequency_penalty = kwargs.get("frequency_penalty")
- if frequency_penalty is not None and _is_given(frequency_penalty):
- span.set_data(SPANDATA.GEN_AI_REQUEST_FREQUENCY_PENALTY, frequency_penalty)
- temperature = kwargs.get("temperature")
- if temperature is not None and _is_given(temperature):
- span.set_data(SPANDATA.GEN_AI_REQUEST_TEMPERATURE, temperature)
- top_p = kwargs.get("top_p")
- if top_p is not None and _is_given(top_p):
- span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_P, top_p)
- if (
- not should_send_default_pii()
- or not integration.include_prompts
- or messages is None
- ):
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "chat")
- return
- if isinstance(messages, str):
- normalized_messages = normalize_message_roles([messages]) # type: ignore
- scope = sentry_sdk.get_current_scope()
- messages_data = truncate_and_annotate_messages(normalized_messages, span, scope)
- if messages_data is not None:
- set_data_normalized(
- span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False
- )
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "chat")
- return
- # dict special case following https://github.com/openai/openai-python/blob/3e0c05b84a2056870abf3bd6a5e7849020209cc3/src/openai/_utils/_transform.py#L194-L197
- if not isinstance(messages, Iterable) or isinstance(messages, dict):
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "chat")
- return
- messages = list(messages)
- kwargs["messages"] = messages
- system_instructions = _get_system_instructions_completions(messages)
- if len(system_instructions) > 0:
- span.set_data(
- SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
- json.dumps(_transform_system_instructions(system_instructions)),
- )
- non_system_messages = [
- message
- for message in messages
- if not _is_system_instruction_completions(message)
- ]
- if len(non_system_messages) > 0:
- normalized_messages = normalize_message_roles(non_system_messages)
- scope = sentry_sdk.get_current_scope()
- messages_data = truncate_and_annotate_messages(normalized_messages, span, scope)
- if messages_data is not None:
- set_data_normalized(
- span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False
- )
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "chat")
- def _set_embeddings_input_data(
- span: "Span",
- kwargs: "dict[str, Any]",
- integration: "OpenAIIntegration",
- ) -> None:
- messages: "Union[str, SequenceNotStr[str], Iterable[int], Iterable[Iterable[int]]]" = kwargs.get(
- "input"
- )
- model = kwargs.get("model")
- if model is not None:
- span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model)
- if (
- not should_send_default_pii()
- or not integration.include_prompts
- or messages is None
- ):
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "embeddings")
- return
- if isinstance(messages, str):
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "embeddings")
- normalized_messages = normalize_message_roles([messages]) # type: ignore
- scope = sentry_sdk.get_current_scope()
- messages_data = truncate_and_annotate_embedding_inputs(
- normalized_messages, span, scope
- )
- if messages_data is not None:
- set_data_normalized(
- span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, messages_data, unpack=False
- )
- return
- # dict special case following https://github.com/openai/openai-python/blob/3e0c05b84a2056870abf3bd6a5e7849020209cc3/src/openai/_utils/_transform.py#L194-L197
- if not isinstance(messages, Iterable) or isinstance(messages, dict):
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "embeddings")
- return
- messages = list(messages)
- kwargs["input"] = messages
- if len(messages) > 0:
- normalized_messages = normalize_message_roles(messages)
- scope = sentry_sdk.get_current_scope()
- messages_data = truncate_and_annotate_embedding_inputs(
- normalized_messages, span, scope
- )
- if messages_data is not None:
- set_data_normalized(
- span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, messages_data, unpack=False
- )
- set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "embeddings")
- def _set_common_output_data(
- span: "Span",
- response: "Any",
- input: "Any",
- integration: "OpenAIIntegration",
- finish_span: bool = True,
- ) -> None:
- if hasattr(response, "model"):
- set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_MODEL, response.model)
- # Chat Completions API
- if hasattr(response, "choices"):
- if should_send_default_pii() and integration.include_prompts:
- response_text = [
- choice.message.model_dump()
- for choice in response.choices
- if choice.message is not None
- ]
- if len(response_text) > 0:
- set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, response_text)
- _calculate_completions_token_usage(
- messages=input,
- response=response,
- span=span,
- streaming_message_responses=None,
- streaming_message_total_token_usage=None,
- count_tokens=integration.count_tokens,
- )
- if finish_span:
- span.__exit__(None, None, None)
- # Responses API
- elif hasattr(response, "output"):
- if should_send_default_pii() and integration.include_prompts:
- output_messages: "dict[str, list[Any]]" = {
- "response": [],
- "tool": [],
- }
- for output in response.output:
- if output.type == "function_call":
- output_messages["tool"].append(output.dict())
- elif output.type == "message":
- for output_message in output.content:
- try:
- output_messages["response"].append(output_message.text)
- except AttributeError:
- # Unknown output message type, just return the json
- output_messages["response"].append(output_message.dict())
- if len(output_messages["tool"]) > 0:
- set_data_normalized(
- span,
- SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
- output_messages["tool"],
- unpack=False,
- )
- if len(output_messages["response"]) > 0:
- set_data_normalized(
- span, SPANDATA.GEN_AI_RESPONSE_TEXT, output_messages["response"]
- )
- _calculate_responses_token_usage(
- input=input,
- response=response,
- span=span,
- streaming_message_responses=None,
- count_tokens=integration.count_tokens,
- )
- if finish_span:
- span.__exit__(None, None, None)
- # Embeddings API (fallback for responses with neither choices nor output)
- else:
- _calculate_completions_token_usage(
- messages=input,
- response=response,
- span=span,
- streaming_message_responses=None,
- streaming_message_total_token_usage=None,
- count_tokens=integration.count_tokens,
- )
- if finish_span:
- span.__exit__(None, None, None)
- def _new_chat_completion_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
- integration = sentry_sdk.get_client().get_integration(OpenAIIntegration)
- if integration is None:
- return f(*args, **kwargs)
- if "messages" not in kwargs:
- # invalid call (in all versions of openai), let it return error
- return f(*args, **kwargs)
- try:
- iter(kwargs["messages"])
- except TypeError:
- # invalid call (in all versions), messages must be iterable
- return f(*args, **kwargs)
- model = kwargs.get("model")
- span = sentry_sdk.start_span(
- op=consts.OP.GEN_AI_CHAT,
- name=f"chat {model}",
- origin=OpenAIIntegration.origin,
- )
- span.__enter__()
- span.set_data(SPANDATA.GEN_AI_SYSTEM, "openai")
- # Same bool handling as in https://github.com/openai/openai-python/blob/acd0c54d8a68efeedde0e5b4e6c310eef1ce7867/src/openai/resources/completions.py#L585
- is_streaming_response = kwargs.get("stream", False) or False
- span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, is_streaming_response)
- _set_completions_api_input_data(span, kwargs, integration)
- start_time = time.perf_counter()
- response = yield f, args, kwargs
- # Attribute check to fail gracefully if the attribute is not present in future `openai` versions.
- if isinstance(response, Stream) and hasattr(response, "_iterator"):
- messages = kwargs.get("messages")
- if messages is not None and isinstance(messages, str):
- messages = [messages]
- response._iterator = _wrap_synchronous_completions_chunk_iterator(
- span=span,
- integration=integration,
- start_time=start_time,
- messages=messages,
- response=response,
- old_iterator=response._iterator,
- finish_span=True,
- )
- # Attribute check to fail gracefully if the attribute is not present in future `openai` versions.
- elif isinstance(response, AsyncStream) and hasattr(response, "_iterator"):
- messages = kwargs.get("messages")
- if messages is not None and isinstance(messages, str):
- messages = [messages]
- response._iterator = _wrap_asynchronous_completions_chunk_iterator(
- span=span,
- integration=integration,
- start_time=start_time,
- messages=messages,
- response=response,
- old_iterator=response._iterator,
- finish_span=True,
- )
- else:
- _set_completions_api_output_data(
- span, response, kwargs, integration, finish_span=True
- )
- return response
- def _set_completions_api_output_data(
- span: "Span",
- response: "Any",
- kwargs: "dict[str, Any]",
- integration: "OpenAIIntegration",
- finish_span: bool = True,
- ) -> None:
- messages = kwargs.get("messages")
- if messages is not None and isinstance(messages, str):
- messages = [messages]
- _set_common_output_data(
- span,
- response,
- messages,
- integration,
- finish_span,
- )
- def _wrap_synchronous_completions_chunk_iterator(
- span: "Span",
- integration: "OpenAIIntegration",
- start_time: "Optional[float]",
- messages: "Optional[Iterable[ChatCompletionMessageParam]]",
- response: "Stream[ChatCompletionChunk]",
- old_iterator: "Iterator[ChatCompletionChunk]",
- finish_span: "bool",
- ) -> "Iterator[ChatCompletionChunk]":
- """
- Sets information received while iterating the response stream on the AI Client Span.
- Compute token count based on inputs and outputs using tiktoken if token counts are not in the model response.
- Responsible for closing the AI Client Span if instructed to by the `finish_span` argument.
- """
- ttft = None
- data_buf: "list[list[str]]" = [] # one for each choice
- streaming_message_total_token_usage = None
- for x in old_iterator:
- span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.model)
- with capture_internal_exceptions():
- if hasattr(x, "choices"):
- choice_index = 0
- for choice in x.choices:
- if hasattr(choice, "delta") and hasattr(choice.delta, "content"):
- if start_time is not None and ttft is None:
- ttft = time.perf_counter() - start_time
- content = choice.delta.content
- if len(data_buf) <= choice_index:
- data_buf.append([])
- data_buf[choice_index].append(content or "")
- choice_index += 1
- if hasattr(x, "usage"):
- streaming_message_total_token_usage = x.usage
- yield x
- with capture_internal_exceptions():
- if ttft is not None:
- set_data_normalized(
- span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft
- )
- all_responses = None
- if len(data_buf) > 0:
- all_responses = ["".join(chunk) for chunk in data_buf]
- if should_send_default_pii() and integration.include_prompts:
- set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses)
- _calculate_completions_token_usage(
- messages=messages,
- response=response,
- span=span,
- streaming_message_responses=all_responses,
- streaming_message_total_token_usage=streaming_message_total_token_usage,
- count_tokens=integration.count_tokens,
- )
- if finish_span:
- span.__exit__(None, None, None)
- async def _wrap_asynchronous_completions_chunk_iterator(
- span: "Span",
- integration: "OpenAIIntegration",
- start_time: "Optional[float]",
- messages: "Optional[Iterable[ChatCompletionMessageParam]]",
- response: "AsyncStream[ChatCompletionChunk]",
- old_iterator: "AsyncIterator[ChatCompletionChunk]",
- finish_span: "bool",
- ) -> "AsyncIterator[ChatCompletionChunk]":
- """
- Sets information received while iterating the response stream on the AI Client Span.
- Compute token count based on inputs and outputs using tiktoken if token counts are not in the model response.
- Responsible for closing the AI Client Span if instructed to by the `finish_span` argument.
- """
- ttft = None
- data_buf: "list[list[str]]" = [] # one for each choice
- streaming_message_total_token_usage = None
- async for x in old_iterator:
- span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.model)
- with capture_internal_exceptions():
- if hasattr(x, "choices"):
- choice_index = 0
- for choice in x.choices:
- if hasattr(choice, "delta") and hasattr(choice.delta, "content"):
- if start_time is not None and ttft is None:
- ttft = time.perf_counter() - start_time
- content = choice.delta.content
- if len(data_buf) <= choice_index:
- data_buf.append([])
- data_buf[choice_index].append(content or "")
- choice_index += 1
- if hasattr(x, "usage"):
- streaming_message_total_token_usage = x.usage
- yield x
- with capture_internal_exceptions():
- if ttft is not None:
- set_data_normalized(
- span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft
- )
- all_responses = None
- if len(data_buf) > 0:
- all_responses = ["".join(chunk) for chunk in data_buf]
- if should_send_default_pii() and integration.include_prompts:
- set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses)
- _calculate_completions_token_usage(
- messages=messages,
- response=response,
- span=span,
- streaming_message_responses=all_responses,
- streaming_message_total_token_usage=streaming_message_total_token_usage,
- count_tokens=integration.count_tokens,
- )
- if finish_span:
- span.__exit__(None, None, None)
- def _wrap_synchronous_responses_event_iterator(
- span: "Span",
- integration: "OpenAIIntegration",
- start_time: "Optional[float]",
- input: "Optional[Union[str, ResponseInputParam]]",
- response: "Stream[ResponseStreamEvent]",
- old_iterator: "Iterator[ResponseStreamEvent]",
- finish_span: "bool",
- ) -> "Iterator[ResponseStreamEvent]":
- """
- Sets information received while iterating the response stream on the AI Client Span.
- Compute token count based on inputs and outputs using tiktoken if token counts are not in the model response.
- Responsible for closing the AI Client Span if instructed to by the `finish_span` argument.
- """
- ttft = None
- data_buf: "list[list[str]]" = [] # one for each choice
- count_tokens_manually = True
- for x in old_iterator:
- with capture_internal_exceptions():
- if hasattr(x, "delta"):
- if start_time is not None and ttft is None:
- ttft = time.perf_counter() - start_time
- if len(data_buf) == 0:
- data_buf.append([])
- data_buf[0].append(x.delta or "")
- if isinstance(x, ResponseCompletedEvent):
- span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.response.model)
- _calculate_responses_token_usage(
- input=input,
- response=x.response,
- span=span,
- streaming_message_responses=None,
- count_tokens=integration.count_tokens,
- )
- count_tokens_manually = False
- yield x
- with capture_internal_exceptions():
- if ttft is not None:
- set_data_normalized(
- span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft
- )
- if len(data_buf) > 0:
- all_responses = ["".join(chunk) for chunk in data_buf]
- if should_send_default_pii() and integration.include_prompts:
- set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses)
- if count_tokens_manually:
- _calculate_responses_token_usage(
- input=input,
- response=response,
- span=span,
- streaming_message_responses=all_responses,
- count_tokens=integration.count_tokens,
- )
- if finish_span:
- span.__exit__(None, None, None)
- async def _wrap_asynchronous_responses_event_iterator(
- span: "Span",
- integration: "OpenAIIntegration",
- start_time: "Optional[float]",
- input: "Optional[Union[str, ResponseInputParam]]",
- response: "AsyncStream[ResponseStreamEvent]",
- old_iterator: "AsyncIterator[ResponseStreamEvent]",
- finish_span: "bool",
- ) -> "AsyncIterator[ResponseStreamEvent]":
- """
- Sets information received while iterating the response stream on the AI Client Span.
- Compute token count based on inputs and outputs using tiktoken if token counts are not in the model response.
- Responsible for closing the AI Client Span if instructed to by the `finish_span` argument.
- """
- ttft: "Optional[float]" = None
- data_buf: "list[list[str]]" = [] # one for each choice
- count_tokens_manually = True
- async for x in old_iterator:
- with capture_internal_exceptions():
- if hasattr(x, "delta"):
- if start_time is not None and ttft is None:
- ttft = time.perf_counter() - start_time
- if len(data_buf) == 0:
- data_buf.append([])
- data_buf[0].append(x.delta or "")
- if isinstance(x, ResponseCompletedEvent):
- span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, x.response.model)
- _calculate_responses_token_usage(
- input=input,
- response=x.response,
- span=span,
- streaming_message_responses=None,
- count_tokens=integration.count_tokens,
- )
- count_tokens_manually = False
- yield x
- with capture_internal_exceptions():
- if ttft is not None:
- set_data_normalized(
- span, SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft
- )
- if len(data_buf) > 0:
- all_responses = ["".join(chunk) for chunk in data_buf]
- if should_send_default_pii() and integration.include_prompts:
- set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, all_responses)
- if count_tokens_manually:
- _calculate_responses_token_usage(
- input=input,
- response=response,
- span=span,
- streaming_message_responses=all_responses,
- count_tokens=integration.count_tokens,
- )
- if finish_span:
- span.__exit__(None, None, None)
- def _set_responses_api_output_data(
- span: "Span",
- response: "Any",
- kwargs: "dict[str, Any]",
- integration: "OpenAIIntegration",
- finish_span: bool = True,
- ) -> None:
- input = kwargs.get("input")
- if input is not None and isinstance(input, str):
- input = [input]
- _set_common_output_data(
- span,
- response,
- input,
- integration,
- finish_span,
- )
- def _set_embeddings_output_data(
- span: "Span",
- response: "Any",
- kwargs: "dict[str, Any]",
- integration: "OpenAIIntegration",
- finish_span: bool = True,
- ) -> None:
- input = kwargs.get("input")
- if input is not None and isinstance(input, str):
- input = [input]
- _set_common_output_data(
- span,
- response,
- input,
- integration,
- finish_span,
- )
- def _wrap_chat_completion_create(f: "Callable[..., Any]") -> "Callable[..., Any]":
- def _execute_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
- gen = _new_chat_completion_common(f, *args, **kwargs)
- try:
- f, args, kwargs = next(gen)
- except StopIteration as e:
- return e.value
- try:
- try:
- result = f(*args, **kwargs)
- except Exception as e:
- exc_info = sys.exc_info()
- with capture_internal_exceptions():
- _capture_exception(e)
- reraise(*exc_info)
- return gen.send(result)
- except StopIteration as e:
- return e.value
- @wraps(f)
- def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
- integration = sentry_sdk.get_client().get_integration(OpenAIIntegration)
- if integration is None or "messages" not in kwargs:
- # no "messages" means invalid call (in all versions of openai), let it return error
- return f(*args, **kwargs)
- return _execute_sync(f, *args, **kwargs)
- return _sentry_patched_create_sync
- def _wrap_async_chat_completion_create(f: "Callable[..., Any]") -> "Callable[..., Any]":
- async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
- gen = _new_chat_completion_common(f, *args, **kwargs)
- try:
- f, args, kwargs = next(gen)
- except StopIteration as e:
- return await e.value
- try:
- try:
- result = await f(*args, **kwargs)
- except Exception as e:
- exc_info = sys.exc_info()
- with capture_internal_exceptions():
- _capture_exception(e)
- reraise(*exc_info)
- return gen.send(result)
- except StopIteration as e:
- return e.value
- @wraps(f)
- async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
- integration = sentry_sdk.get_client().get_integration(OpenAIIntegration)
- if integration is None or "messages" not in kwargs:
- # no "messages" means invalid call (in all versions of openai), let it return error
- return await f(*args, **kwargs)
- return await _execute_async(f, *args, **kwargs)
- return _sentry_patched_create_async
- def _new_embeddings_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
- integration = sentry_sdk.get_client().get_integration(OpenAIIntegration)
- if integration is None:
- return f(*args, **kwargs)
- model = kwargs.get("model")
- with sentry_sdk.start_span(
- op=consts.OP.GEN_AI_EMBEDDINGS,
- name=f"embeddings {model}",
- origin=OpenAIIntegration.origin,
- ) as span:
- span.set_data(SPANDATA.GEN_AI_SYSTEM, "openai")
- _set_embeddings_input_data(span, kwargs, integration)
- response = yield f, args, kwargs
- _set_embeddings_output_data(
- span, response, kwargs, integration, finish_span=False
- )
- return response
- def _wrap_embeddings_create(f: "Any") -> "Any":
- def _execute_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
- gen = _new_embeddings_create_common(f, *args, **kwargs)
- try:
- f, args, kwargs = next(gen)
- except StopIteration as e:
- return e.value
- try:
- try:
- result = f(*args, **kwargs)
- except Exception as e:
- exc_info = sys.exc_info()
- with capture_internal_exceptions():
- _capture_exception(e, manual_span_cleanup=False)
- reraise(*exc_info)
- return gen.send(result)
- except StopIteration as e:
- return e.value
- @wraps(f)
- def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
- integration = sentry_sdk.get_client().get_integration(OpenAIIntegration)
- if integration is None:
- return f(*args, **kwargs)
- return _execute_sync(f, *args, **kwargs)
- return _sentry_patched_create_sync
- def _wrap_async_embeddings_create(f: "Any") -> "Any":
- async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
- gen = _new_embeddings_create_common(f, *args, **kwargs)
- try:
- f, args, kwargs = next(gen)
- except StopIteration as e:
- return await e.value
- try:
- try:
- result = await f(*args, **kwargs)
- except Exception as e:
- exc_info = sys.exc_info()
- with capture_internal_exceptions():
- _capture_exception(e, manual_span_cleanup=False)
- reraise(*exc_info)
- return gen.send(result)
- except StopIteration as e:
- return e.value
- @wraps(f)
- async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
- integration = sentry_sdk.get_client().get_integration(OpenAIIntegration)
- if integration is None:
- return await f(*args, **kwargs)
- return await _execute_async(f, *args, **kwargs)
- return _sentry_patched_create_async
- def _new_responses_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
- integration = sentry_sdk.get_client().get_integration(OpenAIIntegration)
- if integration is None:
- return f(*args, **kwargs)
- model = kwargs.get("model")
- span = sentry_sdk.start_span(
- op=consts.OP.GEN_AI_RESPONSES,
- name=f"responses {model}",
- origin=OpenAIIntegration.origin,
- )
- span.__enter__()
- span.set_data(SPANDATA.GEN_AI_SYSTEM, "openai")
- # Same bool handling as in https://github.com/openai/openai-python/blob/acd0c54d8a68efeedde0e5b4e6c310eef1ce7867/src/openai/resources/responses/responses.py#L940
- is_streaming_response = kwargs.get("stream", False) or False
- span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, is_streaming_response)
- _set_responses_api_input_data(span, kwargs, integration)
- start_time = time.perf_counter()
- response = yield f, args, kwargs
- # Attribute check to fail gracefully if the attribute is not present in future `openai` versions.
- if isinstance(response, Stream) and hasattr(response, "_iterator"):
- input = kwargs.get("input")
- if input is not None and isinstance(input, str):
- input = [input]
- response._iterator = _wrap_synchronous_responses_event_iterator(
- span=span,
- integration=integration,
- start_time=start_time,
- input=input,
- response=response,
- old_iterator=response._iterator,
- finish_span=True,
- )
- # Attribute check to fail gracefully if the attribute is not present in future `openai` versions.
- elif isinstance(response, AsyncStream) and hasattr(response, "_iterator"):
- input = kwargs.get("input")
- if input is not None and isinstance(input, str):
- input = [input]
- response._iterator = _wrap_asynchronous_responses_event_iterator(
- span=span,
- integration=integration,
- start_time=start_time,
- input=input,
- response=response,
- old_iterator=response._iterator,
- finish_span=True,
- )
- else:
- _set_responses_api_output_data(
- span, response, kwargs, integration, finish_span=True
- )
- return response
- def _wrap_responses_create(f: "Any") -> "Any":
- def _execute_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
- gen = _new_responses_create_common(f, *args, **kwargs)
- try:
- f, args, kwargs = next(gen)
- except StopIteration as e:
- return e.value
- try:
- try:
- result = f(*args, **kwargs)
- except Exception as e:
- exc_info = sys.exc_info()
- with capture_internal_exceptions():
- _capture_exception(e)
- reraise(*exc_info)
- return gen.send(result)
- except StopIteration as e:
- return e.value
- @wraps(f)
- def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
- integration = sentry_sdk.get_client().get_integration(OpenAIIntegration)
- if integration is None:
- return f(*args, **kwargs)
- return _execute_sync(f, *args, **kwargs)
- return _sentry_patched_create_sync
- def _wrap_async_responses_create(f: "Any") -> "Any":
- async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
- gen = _new_responses_create_common(f, *args, **kwargs)
- try:
- f, args, kwargs = next(gen)
- except StopIteration as e:
- return await e.value
- try:
- try:
- result = await f(*args, **kwargs)
- except Exception as e:
- exc_info = sys.exc_info()
- with capture_internal_exceptions():
- _capture_exception(e)
- reraise(*exc_info)
- return gen.send(result)
- except StopIteration as e:
- return e.value
- @wraps(f)
- async def _sentry_patched_responses_async(*args: "Any", **kwargs: "Any") -> "Any":
- integration = sentry_sdk.get_client().get_integration(OpenAIIntegration)
- if integration is None:
- return await f(*args, **kwargs)
- return await _execute_async(f, *args, **kwargs)
- return _sentry_patched_responses_async
- def _is_given(obj: "Any") -> bool:
- """
- Check for givenness safely across different openai versions.
- """
- if NotGiven is not None and isinstance(obj, NotGiven):
- return False
- if Omit is not None and isinstance(obj, Omit):
- return False
- return True
|