pymongo.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. import copy
  2. import json
  3. import sentry_sdk
  4. from sentry_sdk.consts import SPANSTATUS, SPANDATA, OP
  5. from sentry_sdk.integrations import DidNotEnable, Integration
  6. from sentry_sdk.scope import should_send_default_pii
  7. from sentry_sdk.tracing import Span
  8. from sentry_sdk.utils import capture_internal_exceptions
  9. try:
  10. from pymongo import monitoring
  11. except ImportError:
  12. raise DidNotEnable("Pymongo not installed")
  13. from typing import TYPE_CHECKING
  14. if TYPE_CHECKING:
  15. from typing import Any, Dict, Union
  16. from pymongo.monitoring import (
  17. CommandFailedEvent,
  18. CommandStartedEvent,
  19. CommandSucceededEvent,
  20. )
  21. SAFE_COMMAND_ATTRIBUTES = [
  22. "insert",
  23. "ordered",
  24. "find",
  25. "limit",
  26. "singleBatch",
  27. "aggregate",
  28. "createIndexes",
  29. "indexes",
  30. "delete",
  31. "findAndModify",
  32. "renameCollection",
  33. "to",
  34. "drop",
  35. ]
  36. def _strip_pii(command: "Dict[str, Any]") -> "Dict[str, Any]":
  37. for key in command:
  38. is_safe_field = key in SAFE_COMMAND_ATTRIBUTES
  39. if is_safe_field:
  40. # Skip if safe key
  41. continue
  42. update_db_command = key == "update" and "findAndModify" not in command
  43. if update_db_command:
  44. # Also skip "update" db command because it is save.
  45. # There is also an "update" key in the "findAndModify" command, which is NOT safe!
  46. continue
  47. # Special stripping for documents
  48. is_document = key == "documents"
  49. if is_document:
  50. for doc in command[key]:
  51. for doc_key in doc:
  52. doc[doc_key] = "%s"
  53. continue
  54. # Special stripping for dict style fields
  55. is_dict_field = key in ["filter", "query", "update"]
  56. if is_dict_field:
  57. for item_key in command[key]:
  58. command[key][item_key] = "%s"
  59. continue
  60. # For pipeline fields strip the `$match` dict
  61. is_pipeline_field = key == "pipeline"
  62. if is_pipeline_field:
  63. for pipeline in command[key]:
  64. for match_key in pipeline["$match"] if "$match" in pipeline else []:
  65. pipeline["$match"][match_key] = "%s"
  66. continue
  67. # Default stripping
  68. command[key] = "%s"
  69. return command
  70. def _get_db_data(event: "Any") -> "Dict[str, Any]":
  71. data = {}
  72. data[SPANDATA.DB_SYSTEM] = "mongodb"
  73. db_name = event.database_name
  74. if db_name is not None:
  75. data[SPANDATA.DB_NAME] = db_name
  76. server_address = event.connection_id[0]
  77. if server_address is not None:
  78. data[SPANDATA.SERVER_ADDRESS] = server_address
  79. server_port = event.connection_id[1]
  80. if server_port is not None:
  81. data[SPANDATA.SERVER_PORT] = server_port
  82. return data
  83. class CommandTracer(monitoring.CommandListener):
  84. def __init__(self) -> None:
  85. self._ongoing_operations: "Dict[int, Span]" = {}
  86. def _operation_key(
  87. self,
  88. event: "Union[CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent]",
  89. ) -> int:
  90. return event.request_id
  91. def started(self, event: "CommandStartedEvent") -> None:
  92. if sentry_sdk.get_client().get_integration(PyMongoIntegration) is None:
  93. return
  94. with capture_internal_exceptions():
  95. command = dict(copy.deepcopy(event.command))
  96. command.pop("$db", None)
  97. command.pop("$clusterTime", None)
  98. command.pop("$signature", None)
  99. tags = {
  100. "db.name": event.database_name,
  101. SPANDATA.DB_SYSTEM: "mongodb",
  102. SPANDATA.DB_OPERATION: event.command_name,
  103. SPANDATA.DB_MONGODB_COLLECTION: command.get(event.command_name),
  104. }
  105. try:
  106. tags["net.peer.name"] = event.connection_id[0]
  107. tags["net.peer.port"] = str(event.connection_id[1])
  108. except TypeError:
  109. pass
  110. data: "Dict[str, Any]" = {"operation_ids": {}}
  111. data["operation_ids"]["operation"] = event.operation_id
  112. data["operation_ids"]["request"] = event.request_id
  113. data.update(_get_db_data(event))
  114. try:
  115. lsid = command.pop("lsid")["id"]
  116. data["operation_ids"]["session"] = str(lsid)
  117. except KeyError:
  118. pass
  119. if not should_send_default_pii():
  120. command = _strip_pii(command)
  121. query = json.dumps(command, default=str)
  122. span = sentry_sdk.start_span(
  123. op=OP.DB,
  124. name=query,
  125. origin=PyMongoIntegration.origin,
  126. )
  127. for tag, value in tags.items():
  128. # set the tag for backwards-compatibility.
  129. # TODO: remove the set_tag call in the next major release!
  130. span.set_tag(tag, value)
  131. span.set_data(tag, value)
  132. for key, value in data.items():
  133. span.set_data(key, value)
  134. with capture_internal_exceptions():
  135. sentry_sdk.add_breadcrumb(
  136. message=query, category="query", type=OP.DB, data=tags
  137. )
  138. self._ongoing_operations[self._operation_key(event)] = span.__enter__()
  139. def failed(self, event: "CommandFailedEvent") -> None:
  140. if sentry_sdk.get_client().get_integration(PyMongoIntegration) is None:
  141. return
  142. try:
  143. span = self._ongoing_operations.pop(self._operation_key(event))
  144. span.set_status(SPANSTATUS.INTERNAL_ERROR)
  145. span.__exit__(None, None, None)
  146. except KeyError:
  147. return
  148. def succeeded(self, event: "CommandSucceededEvent") -> None:
  149. if sentry_sdk.get_client().get_integration(PyMongoIntegration) is None:
  150. return
  151. try:
  152. span = self._ongoing_operations.pop(self._operation_key(event))
  153. span.set_status(SPANSTATUS.OK)
  154. span.__exit__(None, None, None)
  155. except KeyError:
  156. pass
  157. class PyMongoIntegration(Integration):
  158. identifier = "pymongo"
  159. origin = f"auto.db.{identifier}"
  160. @staticmethod
  161. def setup_once() -> None:
  162. monitoring.register(CommandTracer())