beam.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import sys
  2. import types
  3. from functools import wraps
  4. import sentry_sdk
  5. from sentry_sdk.integrations import Integration
  6. from sentry_sdk.integrations.logging import ignore_logger
  7. from sentry_sdk.utils import (
  8. capture_internal_exceptions,
  9. ensure_integration_enabled,
  10. event_from_exception,
  11. reraise,
  12. )
  13. from typing import TYPE_CHECKING
  14. if TYPE_CHECKING:
  15. from typing import Any
  16. from typing import Iterator
  17. from typing import TypeVar
  18. from typing import Callable
  19. from sentry_sdk._types import ExcInfo
  20. T = TypeVar("T")
  21. F = TypeVar("F", bound=Callable[..., Any])
  22. WRAPPED_FUNC = "_wrapped_{}_"
  23. INSPECT_FUNC = "_inspect_{}" # Required format per apache_beam/transforms/core.py
  24. USED_FUNC = "_sentry_used_"
  25. class BeamIntegration(Integration):
  26. identifier = "beam"
  27. @staticmethod
  28. def setup_once():
  29. # type: () -> None
  30. from apache_beam.transforms.core import DoFn, ParDo # type: ignore
  31. ignore_logger("root")
  32. ignore_logger("bundle_processor.create")
  33. function_patches = ["process", "start_bundle", "finish_bundle", "setup"]
  34. for func_name in function_patches:
  35. setattr(
  36. DoFn,
  37. INSPECT_FUNC.format(func_name),
  38. _wrap_inspect_call(DoFn, func_name),
  39. )
  40. old_init = ParDo.__init__
  41. def sentry_init_pardo(self, fn, *args, **kwargs):
  42. # type: (ParDo, Any, *Any, **Any) -> Any
  43. # Do not monkey patch init twice
  44. if not getattr(self, "_sentry_is_patched", False):
  45. for func_name in function_patches:
  46. if not hasattr(fn, func_name):
  47. continue
  48. wrapped_func = WRAPPED_FUNC.format(func_name)
  49. # Check to see if inspect is set and process is not
  50. # to avoid monkey patching process twice.
  51. # Check to see if function is part of object for
  52. # backwards compatibility.
  53. process_func = getattr(fn, func_name)
  54. inspect_func = getattr(fn, INSPECT_FUNC.format(func_name))
  55. if not getattr(inspect_func, USED_FUNC, False) and not getattr(
  56. process_func, USED_FUNC, False
  57. ):
  58. setattr(fn, wrapped_func, process_func)
  59. setattr(fn, func_name, _wrap_task_call(process_func))
  60. self._sentry_is_patched = True
  61. old_init(self, fn, *args, **kwargs)
  62. ParDo.__init__ = sentry_init_pardo
  63. def _wrap_inspect_call(cls, func_name):
  64. # type: (Any, Any) -> Any
  65. if not hasattr(cls, func_name):
  66. return None
  67. def _inspect(self):
  68. # type: (Any) -> Any
  69. """
  70. Inspect function overrides the way Beam gets argspec.
  71. """
  72. wrapped_func = WRAPPED_FUNC.format(func_name)
  73. if hasattr(self, wrapped_func):
  74. process_func = getattr(self, wrapped_func)
  75. else:
  76. process_func = getattr(self, func_name)
  77. setattr(self, func_name, _wrap_task_call(process_func))
  78. setattr(self, wrapped_func, process_func)
  79. # getfullargspec is deprecated in more recent beam versions and get_function_args_defaults
  80. # (which uses Signatures internally) should be used instead.
  81. try:
  82. from apache_beam.transforms.core import get_function_args_defaults
  83. return get_function_args_defaults(process_func)
  84. except ImportError:
  85. from apache_beam.typehints.decorators import getfullargspec # type: ignore
  86. return getfullargspec(process_func)
  87. setattr(_inspect, USED_FUNC, True)
  88. return _inspect
  89. def _wrap_task_call(func):
  90. # type: (F) -> F
  91. """
  92. Wrap task call with a try catch to get exceptions.
  93. """
  94. @wraps(func)
  95. def _inner(*args, **kwargs):
  96. # type: (*Any, **Any) -> Any
  97. try:
  98. gen = func(*args, **kwargs)
  99. except Exception:
  100. raise_exception()
  101. if not isinstance(gen, types.GeneratorType):
  102. return gen
  103. return _wrap_generator_call(gen)
  104. setattr(_inner, USED_FUNC, True)
  105. return _inner # type: ignore
  106. @ensure_integration_enabled(BeamIntegration)
  107. def _capture_exception(exc_info):
  108. # type: (ExcInfo) -> None
  109. """
  110. Send Beam exception to Sentry.
  111. """
  112. client = sentry_sdk.get_client()
  113. event, hint = event_from_exception(
  114. exc_info,
  115. client_options=client.options,
  116. mechanism={"type": "beam", "handled": False},
  117. )
  118. sentry_sdk.capture_event(event, hint=hint)
  119. def raise_exception():
  120. # type: () -> None
  121. """
  122. Raise an exception.
  123. """
  124. exc_info = sys.exc_info()
  125. with capture_internal_exceptions():
  126. _capture_exception(exc_info)
  127. reraise(*exc_info)
  128. def _wrap_generator_call(gen):
  129. # type: (Iterator[T]) -> Iterator[T]
  130. """
  131. Wrap the generator to handle any failures.
  132. """
  133. while True:
  134. try:
  135. yield next(gen)
  136. except StopIteration:
  137. break
  138. except Exception:
  139. raise_exception()