| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- from typing import TYPE_CHECKING, Any, List, TypedDict, Optional
- from sentry_sdk.ai.utils import set_data_normalized
- from sentry_sdk.consts import SPANDATA
- from sentry_sdk.scope import should_send_default_pii
- from sentry_sdk.utils import (
- safe_serialize,
- )
- from .utils import (
- extract_tool_calls,
- extract_finish_reasons,
- extract_contents_text,
- extract_usage_data,
- UsageData,
- )
- if TYPE_CHECKING:
- from sentry_sdk.tracing import Span
- from google.genai.types import GenerateContentResponse
- class AccumulatedResponse(TypedDict):
- id: "Optional[str]"
- model: "Optional[str]"
- text: str
- finish_reasons: "List[str]"
- tool_calls: "List[dict[str, Any]]"
- usage_metadata: "Optional[UsageData]"
- def element_wise_usage_max(self: "UsageData", other: "UsageData") -> "UsageData":
- return UsageData(
- input_tokens=max(self["input_tokens"], other["input_tokens"]),
- output_tokens=max(self["output_tokens"], other["output_tokens"]),
- input_tokens_cached=max(
- self["input_tokens_cached"], other["input_tokens_cached"]
- ),
- output_tokens_reasoning=max(
- self["output_tokens_reasoning"], other["output_tokens_reasoning"]
- ),
- total_tokens=max(self["total_tokens"], other["total_tokens"]),
- )
- def accumulate_streaming_response(
- chunks: "List[GenerateContentResponse]",
- ) -> "AccumulatedResponse":
- """Accumulate streaming chunks into a single response-like object."""
- accumulated_text = []
- finish_reasons = []
- tool_calls = []
- usage_data = None
- response_id = None
- model = None
- for chunk in chunks:
- # Extract text and tool calls
- if getattr(chunk, "candidates", None):
- for candidate in getattr(chunk, "candidates", []):
- if hasattr(candidate, "content") and getattr(
- candidate.content, "parts", []
- ):
- extracted_text = extract_contents_text(candidate.content)
- if extracted_text:
- accumulated_text.append(extracted_text)
- extracted_finish_reasons = extract_finish_reasons(chunk)
- if extracted_finish_reasons:
- finish_reasons.extend(extracted_finish_reasons)
- extracted_tool_calls = extract_tool_calls(chunk)
- if extracted_tool_calls:
- tool_calls.extend(extracted_tool_calls)
- # Use last possible chunk, in case of interruption, and
- # gracefully handle missing intermediate tokens by taking maximum
- # with previous token reporting.
- chunk_usage_data = extract_usage_data(chunk)
- usage_data = (
- chunk_usage_data
- if usage_data is None
- else element_wise_usage_max(usage_data, chunk_usage_data)
- )
- accumulated_response = AccumulatedResponse(
- text="".join(accumulated_text),
- finish_reasons=finish_reasons,
- tool_calls=tool_calls,
- usage_metadata=usage_data,
- id=response_id,
- model=model,
- )
- return accumulated_response
- def set_span_data_for_streaming_response(
- span: "Span", integration: "Any", accumulated_response: "AccumulatedResponse"
- ) -> None:
- """Set span data for accumulated streaming response."""
- if (
- should_send_default_pii()
- and integration.include_prompts
- and accumulated_response.get("text")
- ):
- span.set_data(
- SPANDATA.GEN_AI_RESPONSE_TEXT,
- safe_serialize([accumulated_response["text"]]),
- )
- if accumulated_response.get("finish_reasons"):
- set_data_normalized(
- span,
- SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS,
- accumulated_response["finish_reasons"],
- )
- if accumulated_response.get("tool_calls"):
- span.set_data(
- SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
- safe_serialize(accumulated_response["tool_calls"]),
- )
- if accumulated_response.get("id"):
- span.set_data(SPANDATA.GEN_AI_RESPONSE_ID, accumulated_response["id"])
- if accumulated_response.get("model"):
- span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, accumulated_response["model"])
- if accumulated_response["usage_metadata"] is None:
- return
- if accumulated_response["usage_metadata"]["input_tokens"]:
- span.set_data(
- SPANDATA.GEN_AI_USAGE_INPUT_TOKENS,
- accumulated_response["usage_metadata"]["input_tokens"],
- )
- if accumulated_response["usage_metadata"]["input_tokens_cached"]:
- span.set_data(
- SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHED,
- accumulated_response["usage_metadata"]["input_tokens_cached"],
- )
- if accumulated_response["usage_metadata"]["output_tokens"]:
- span.set_data(
- SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS,
- accumulated_response["usage_metadata"]["output_tokens"],
- )
- if accumulated_response["usage_metadata"]["output_tokens_reasoning"]:
- span.set_data(
- SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS_REASONING,
- accumulated_response["usage_metadata"]["output_tokens_reasoning"],
- )
- if accumulated_response["usage_metadata"]["total_tokens"]:
- span.set_data(
- SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS,
- accumulated_response["usage_metadata"]["total_tokens"],
- )
|