pymongo.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import copy
  2. import json
  3. import sentry_sdk
  4. from sentry_sdk.consts import SPANSTATUS, SPANDATA, OP
  5. from sentry_sdk.integrations import DidNotEnable, Integration
  6. from sentry_sdk.scope import should_send_default_pii
  7. from sentry_sdk.tracing import Span
  8. from sentry_sdk.utils import capture_internal_exceptions
  9. try:
  10. from pymongo import monitoring
  11. except ImportError:
  12. raise DidNotEnable("Pymongo not installed")
  13. from typing import TYPE_CHECKING
  14. if TYPE_CHECKING:
  15. from typing import Any, Dict, Union
  16. from pymongo.monitoring import (
  17. CommandFailedEvent,
  18. CommandStartedEvent,
  19. CommandSucceededEvent,
  20. )
  21. SAFE_COMMAND_ATTRIBUTES = [
  22. "insert",
  23. "ordered",
  24. "find",
  25. "limit",
  26. "singleBatch",
  27. "aggregate",
  28. "createIndexes",
  29. "indexes",
  30. "delete",
  31. "findAndModify",
  32. "renameCollection",
  33. "to",
  34. "drop",
  35. ]
  36. def _strip_pii(command):
  37. # type: (Dict[str, Any]) -> Dict[str, Any]
  38. for key in command:
  39. is_safe_field = key in SAFE_COMMAND_ATTRIBUTES
  40. if is_safe_field:
  41. # Skip if safe key
  42. continue
  43. update_db_command = key == "update" and "findAndModify" not in command
  44. if update_db_command:
  45. # Also skip "update" db command because it is save.
  46. # There is also an "update" key in the "findAndModify" command, which is NOT safe!
  47. continue
  48. # Special stripping for documents
  49. is_document = key == "documents"
  50. if is_document:
  51. for doc in command[key]:
  52. for doc_key in doc:
  53. doc[doc_key] = "%s"
  54. continue
  55. # Special stripping for dict style fields
  56. is_dict_field = key in ["filter", "query", "update"]
  57. if is_dict_field:
  58. for item_key in command[key]:
  59. command[key][item_key] = "%s"
  60. continue
  61. # For pipeline fields strip the `$match` dict
  62. is_pipeline_field = key == "pipeline"
  63. if is_pipeline_field:
  64. for pipeline in command[key]:
  65. for match_key in pipeline["$match"] if "$match" in pipeline else []:
  66. pipeline["$match"][match_key] = "%s"
  67. continue
  68. # Default stripping
  69. command[key] = "%s"
  70. return command
  71. def _get_db_data(event):
  72. # type: (Any) -> Dict[str, Any]
  73. data = {}
  74. data[SPANDATA.DB_SYSTEM] = "mongodb"
  75. db_name = event.database_name
  76. if db_name is not None:
  77. data[SPANDATA.DB_NAME] = db_name
  78. server_address = event.connection_id[0]
  79. if server_address is not None:
  80. data[SPANDATA.SERVER_ADDRESS] = server_address
  81. server_port = event.connection_id[1]
  82. if server_port is not None:
  83. data[SPANDATA.SERVER_PORT] = server_port
  84. return data
  85. class CommandTracer(monitoring.CommandListener):
  86. def __init__(self):
  87. # type: () -> None
  88. self._ongoing_operations = {} # type: Dict[int, Span]
  89. def _operation_key(self, event):
  90. # type: (Union[CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent]) -> int
  91. return event.request_id
  92. def started(self, event):
  93. # type: (CommandStartedEvent) -> None
  94. if sentry_sdk.get_client().get_integration(PyMongoIntegration) is None:
  95. return
  96. with capture_internal_exceptions():
  97. command = dict(copy.deepcopy(event.command))
  98. command.pop("$db", None)
  99. command.pop("$clusterTime", None)
  100. command.pop("$signature", None)
  101. tags = {
  102. "db.name": event.database_name,
  103. SPANDATA.DB_SYSTEM: "mongodb",
  104. SPANDATA.DB_OPERATION: event.command_name,
  105. SPANDATA.DB_MONGODB_COLLECTION: command.get(event.command_name),
  106. }
  107. try:
  108. tags["net.peer.name"] = event.connection_id[0]
  109. tags["net.peer.port"] = str(event.connection_id[1])
  110. except TypeError:
  111. pass
  112. data = {"operation_ids": {}} # type: Dict[str, Any]
  113. data["operation_ids"]["operation"] = event.operation_id
  114. data["operation_ids"]["request"] = event.request_id
  115. data.update(_get_db_data(event))
  116. try:
  117. lsid = command.pop("lsid")["id"]
  118. data["operation_ids"]["session"] = str(lsid)
  119. except KeyError:
  120. pass
  121. if not should_send_default_pii():
  122. command = _strip_pii(command)
  123. query = json.dumps(command, default=str)
  124. span = sentry_sdk.start_span(
  125. op=OP.DB,
  126. name=query,
  127. origin=PyMongoIntegration.origin,
  128. )
  129. for tag, value in tags.items():
  130. # set the tag for backwards-compatibility.
  131. # TODO: remove the set_tag call in the next major release!
  132. span.set_tag(tag, value)
  133. span.set_data(tag, value)
  134. for key, value in data.items():
  135. span.set_data(key, value)
  136. with capture_internal_exceptions():
  137. sentry_sdk.add_breadcrumb(
  138. message=query, category="query", type=OP.DB, data=tags
  139. )
  140. self._ongoing_operations[self._operation_key(event)] = span.__enter__()
  141. def failed(self, event):
  142. # type: (CommandFailedEvent) -> None
  143. if sentry_sdk.get_client().get_integration(PyMongoIntegration) is None:
  144. return
  145. try:
  146. span = self._ongoing_operations.pop(self._operation_key(event))
  147. span.set_status(SPANSTATUS.INTERNAL_ERROR)
  148. span.__exit__(None, None, None)
  149. except KeyError:
  150. return
  151. def succeeded(self, event):
  152. # type: (CommandSucceededEvent) -> None
  153. if sentry_sdk.get_client().get_integration(PyMongoIntegration) is None:
  154. return
  155. try:
  156. span = self._ongoing_operations.pop(self._operation_key(event))
  157. span.set_status(SPANSTATUS.OK)
  158. span.__exit__(None, None, None)
  159. except KeyError:
  160. pass
  161. class PyMongoIntegration(Integration):
  162. identifier = "pymongo"
  163. origin = f"auto.db.{identifier}"
  164. @staticmethod
  165. def setup_once():
  166. # type: () -> None
  167. monitoring.register(CommandTracer())