asyncpg.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. from __future__ import annotations
  2. import contextlib
  3. import re
  4. from typing import Any, TypeVar, Callable, Awaitable, Iterator
  5. import sentry_sdk
  6. from sentry_sdk.consts import OP, SPANDATA
  7. from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable
  8. from sentry_sdk.tracing import Span
  9. from sentry_sdk.tracing_utils import add_query_source, record_sql_queries
  10. from sentry_sdk.utils import (
  11. ensure_integration_enabled,
  12. parse_version,
  13. capture_internal_exceptions,
  14. )
  15. try:
  16. import asyncpg # type: ignore[import-not-found]
  17. from asyncpg.cursor import BaseCursor # type: ignore
  18. except ImportError:
  19. raise DidNotEnable("asyncpg not installed.")
  20. class AsyncPGIntegration(Integration):
  21. identifier = "asyncpg"
  22. origin = f"auto.db.{identifier}"
  23. _record_params = False
  24. def __init__(self, *, record_params: bool = False):
  25. AsyncPGIntegration._record_params = record_params
  26. @staticmethod
  27. def setup_once() -> None:
  28. # asyncpg.__version__ is a string containing the semantic version in the form of "<major>.<minor>.<patch>"
  29. asyncpg_version = parse_version(asyncpg.__version__)
  30. _check_minimum_version(AsyncPGIntegration, asyncpg_version)
  31. asyncpg.Connection.execute = _wrap_execute(
  32. asyncpg.Connection.execute,
  33. )
  34. asyncpg.Connection._execute = _wrap_connection_method(
  35. asyncpg.Connection._execute
  36. )
  37. asyncpg.Connection._executemany = _wrap_connection_method(
  38. asyncpg.Connection._executemany, executemany=True
  39. )
  40. asyncpg.Connection.cursor = _wrap_cursor_creation(asyncpg.Connection.cursor)
  41. asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare)
  42. asyncpg.connect_utils._connect_addr = _wrap_connect_addr(
  43. asyncpg.connect_utils._connect_addr
  44. )
  45. T = TypeVar("T")
  46. def _normalize_query(query: str) -> str:
  47. return re.sub(r"\s+", " ", query).strip()
  48. def _wrap_execute(f: "Callable[..., Awaitable[T]]") -> "Callable[..., Awaitable[T]]":
  49. async def _inner(*args: "Any", **kwargs: "Any") -> "T":
  50. if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
  51. return await f(*args, **kwargs)
  52. # Avoid recording calls to _execute twice.
  53. # Calls to Connection.execute with args also call
  54. # Connection._execute, which is recorded separately
  55. # args[0] = the connection object, args[1] is the query
  56. if len(args) > 2:
  57. return await f(*args, **kwargs)
  58. query = _normalize_query(args[1])
  59. with record_sql_queries(
  60. cursor=None,
  61. query=query,
  62. params_list=None,
  63. paramstyle=None,
  64. executemany=False,
  65. span_origin=AsyncPGIntegration.origin,
  66. ) as span:
  67. res = await f(*args, **kwargs)
  68. with capture_internal_exceptions():
  69. add_query_source(span)
  70. return res
  71. return _inner
  72. SubCursor = TypeVar("SubCursor", bound=BaseCursor)
  73. @contextlib.contextmanager
  74. def _record(
  75. cursor: "SubCursor | None",
  76. query: str,
  77. params_list: "tuple[Any, ...] | None",
  78. *,
  79. executemany: bool = False,
  80. ) -> "Iterator[Span]":
  81. integration = sentry_sdk.get_client().get_integration(AsyncPGIntegration)
  82. if integration is not None and not integration._record_params:
  83. params_list = None
  84. param_style = "pyformat" if params_list else None
  85. query = _normalize_query(query)
  86. with record_sql_queries(
  87. cursor=cursor,
  88. query=query,
  89. params_list=params_list,
  90. paramstyle=param_style,
  91. executemany=executemany,
  92. record_cursor_repr=cursor is not None,
  93. span_origin=AsyncPGIntegration.origin,
  94. ) as span:
  95. yield span
  96. def _wrap_connection_method(
  97. f: "Callable[..., Awaitable[T]]", *, executemany: bool = False
  98. ) -> "Callable[..., Awaitable[T]]":
  99. async def _inner(*args: "Any", **kwargs: "Any") -> "T":
  100. if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
  101. return await f(*args, **kwargs)
  102. query = args[1]
  103. params_list = args[2] if len(args) > 2 else None
  104. with _record(None, query, params_list, executemany=executemany) as span:
  105. _set_db_data(span, args[0])
  106. res = await f(*args, **kwargs)
  107. return res
  108. return _inner
  109. def _wrap_cursor_creation(f: "Callable[..., T]") -> "Callable[..., T]":
  110. @ensure_integration_enabled(AsyncPGIntegration, f)
  111. def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807
  112. query = args[1]
  113. params_list = args[2] if len(args) > 2 else None
  114. with _record(
  115. None,
  116. query,
  117. params_list,
  118. executemany=False,
  119. ) as span:
  120. _set_db_data(span, args[0])
  121. res = f(*args, **kwargs)
  122. span.set_data("db.cursor", res)
  123. return res
  124. return _inner
  125. def _wrap_connect_addr(
  126. f: "Callable[..., Awaitable[T]]",
  127. ) -> "Callable[..., Awaitable[T]]":
  128. async def _inner(*args: "Any", **kwargs: "Any") -> "T":
  129. if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
  130. return await f(*args, **kwargs)
  131. user = kwargs["params"].user
  132. database = kwargs["params"].database
  133. with sentry_sdk.start_span(
  134. op=OP.DB,
  135. name="connect",
  136. origin=AsyncPGIntegration.origin,
  137. ) as span:
  138. span.set_data(SPANDATA.DB_SYSTEM, "postgresql")
  139. addr = kwargs.get("addr")
  140. if addr:
  141. try:
  142. span.set_data(SPANDATA.SERVER_ADDRESS, addr[0])
  143. span.set_data(SPANDATA.SERVER_PORT, addr[1])
  144. except IndexError:
  145. pass
  146. span.set_data(SPANDATA.DB_NAME, database)
  147. span.set_data(SPANDATA.DB_USER, user)
  148. with capture_internal_exceptions():
  149. sentry_sdk.add_breadcrumb(
  150. message="connect", category="query", data=span._data
  151. )
  152. res = await f(*args, **kwargs)
  153. return res
  154. return _inner
  155. def _set_db_data(span: "Span", conn: "Any") -> None:
  156. span.set_data(SPANDATA.DB_SYSTEM, "postgresql")
  157. addr = conn._addr
  158. if addr:
  159. try:
  160. span.set_data(SPANDATA.SERVER_ADDRESS, addr[0])
  161. span.set_data(SPANDATA.SERVER_PORT, addr[1])
  162. except IndexError:
  163. pass
  164. database = conn._params.database
  165. if database:
  166. span.set_data(SPANDATA.DB_NAME, database)
  167. user = conn._params.user
  168. if user:
  169. span.set_data(SPANDATA.DB_USER, user)