beat.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. import sentry_sdk
  2. from sentry_sdk.crons import capture_checkin, MonitorStatus
  3. from sentry_sdk.integrations import DidNotEnable
  4. from sentry_sdk.integrations.celery.utils import (
  5. _get_humanized_interval,
  6. _now_seconds_since_epoch,
  7. )
  8. from sentry_sdk.utils import (
  9. logger,
  10. match_regex_list,
  11. )
  12. from typing import TYPE_CHECKING
  13. if TYPE_CHECKING:
  14. from collections.abc import Callable
  15. from typing import Any, Optional, TypeVar, Union
  16. from sentry_sdk._types import (
  17. MonitorConfig,
  18. MonitorConfigScheduleType,
  19. MonitorConfigScheduleUnit,
  20. )
  21. F = TypeVar("F", bound=Callable[..., Any])
  22. try:
  23. from celery import Task, Celery # type: ignore
  24. from celery.beat import Scheduler # type: ignore
  25. from celery.schedules import crontab, schedule # type: ignore
  26. from celery.signals import ( # type: ignore
  27. task_failure,
  28. task_success,
  29. task_retry,
  30. )
  31. except ImportError:
  32. raise DidNotEnable("Celery not installed")
  33. try:
  34. from redbeat.schedulers import RedBeatScheduler # type: ignore
  35. except ImportError:
  36. RedBeatScheduler = None
  37. def _get_headers(task: "Task") -> "dict[str, Any]":
  38. headers = task.request.get("headers") or {}
  39. # flatten nested headers
  40. if "headers" in headers:
  41. headers.update(headers["headers"])
  42. del headers["headers"]
  43. headers.update(task.request.get("properties") or {})
  44. return headers
  45. def _get_monitor_config(
  46. celery_schedule: "Any", app: "Celery", monitor_name: str
  47. ) -> "MonitorConfig":
  48. monitor_config: "MonitorConfig" = {}
  49. schedule_type: "Optional[MonitorConfigScheduleType]" = None
  50. schedule_value: "Optional[Union[str, int]]" = None
  51. schedule_unit: "Optional[MonitorConfigScheduleUnit]" = None
  52. if isinstance(celery_schedule, crontab):
  53. schedule_type = "crontab"
  54. schedule_value = (
  55. "{0._orig_minute} "
  56. "{0._orig_hour} "
  57. "{0._orig_day_of_month} "
  58. "{0._orig_month_of_year} "
  59. "{0._orig_day_of_week}".format(celery_schedule)
  60. )
  61. elif isinstance(celery_schedule, schedule):
  62. schedule_type = "interval"
  63. (schedule_value, schedule_unit) = _get_humanized_interval(
  64. celery_schedule.seconds
  65. )
  66. if schedule_unit == "second":
  67. logger.warning(
  68. "Intervals shorter than one minute are not supported by Sentry Crons. Monitor '%s' has an interval of %s seconds. Use the `exclude_beat_tasks` option in the celery integration to exclude it.",
  69. monitor_name,
  70. schedule_value,
  71. )
  72. return {}
  73. else:
  74. logger.warning(
  75. "Celery schedule type '%s' not supported by Sentry Crons.",
  76. type(celery_schedule),
  77. )
  78. return {}
  79. monitor_config["schedule"] = {}
  80. monitor_config["schedule"]["type"] = schedule_type
  81. monitor_config["schedule"]["value"] = schedule_value
  82. if schedule_unit is not None:
  83. monitor_config["schedule"]["unit"] = schedule_unit
  84. monitor_config["timezone"] = (
  85. (
  86. hasattr(celery_schedule, "tz")
  87. and celery_schedule.tz is not None
  88. and str(celery_schedule.tz)
  89. )
  90. or app.timezone
  91. or "UTC"
  92. )
  93. return monitor_config
  94. def _apply_crons_data_to_schedule_entry(
  95. scheduler: "Any",
  96. schedule_entry: "Any",
  97. integration: "sentry_sdk.integrations.celery.CeleryIntegration",
  98. ) -> None:
  99. """
  100. Add Sentry Crons information to the schedule_entry headers.
  101. """
  102. if not integration.monitor_beat_tasks:
  103. return
  104. monitor_name = schedule_entry.name
  105. task_should_be_excluded = match_regex_list(
  106. monitor_name, integration.exclude_beat_tasks
  107. )
  108. if task_should_be_excluded:
  109. return
  110. celery_schedule = schedule_entry.schedule
  111. app = scheduler.app
  112. monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
  113. is_supported_schedule = bool(monitor_config)
  114. if not is_supported_schedule:
  115. return
  116. headers = schedule_entry.options.pop("headers", {})
  117. headers.update(
  118. {
  119. "sentry-monitor-slug": monitor_name,
  120. "sentry-monitor-config": monitor_config,
  121. }
  122. )
  123. check_in_id = capture_checkin(
  124. monitor_slug=monitor_name,
  125. monitor_config=monitor_config,
  126. status=MonitorStatus.IN_PROGRESS,
  127. )
  128. headers.update({"sentry-monitor-check-in-id": check_in_id})
  129. # Set the Sentry configuration in the options of the ScheduleEntry.
  130. # Those will be picked up in `apply_async` and added to the headers.
  131. schedule_entry.options["headers"] = headers
  132. def _wrap_beat_scheduler(
  133. original_function: "Callable[..., Any]",
  134. ) -> "Callable[..., Any]":
  135. """
  136. Makes sure that:
  137. - a new Sentry trace is started for each task started by Celery Beat and
  138. it is propagated to the task.
  139. - the Sentry Crons information is set in the Celery Beat task's
  140. headers so that is is monitored with Sentry Crons.
  141. After the patched function is called,
  142. Celery Beat will call apply_async to put the task in the queue.
  143. """
  144. # Patch only once
  145. # Can't use __name__ here, because some of our tests mock original_apply_entry
  146. already_patched = "sentry_patched_scheduler" in str(original_function)
  147. if already_patched:
  148. return original_function
  149. from sentry_sdk.integrations.celery import CeleryIntegration
  150. def sentry_patched_scheduler(*args: "Any", **kwargs: "Any") -> None:
  151. integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
  152. if integration is None:
  153. return original_function(*args, **kwargs)
  154. # Tasks started by Celery Beat start a new Trace
  155. scope = sentry_sdk.get_isolation_scope()
  156. scope.set_new_propagation_context()
  157. scope._name = "celery-beat"
  158. scheduler, schedule_entry = args
  159. _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)
  160. return original_function(*args, **kwargs)
  161. return sentry_patched_scheduler
  162. def _patch_beat_apply_entry() -> None:
  163. Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
  164. def _patch_redbeat_apply_async() -> None:
  165. if RedBeatScheduler is None:
  166. return
  167. RedBeatScheduler.apply_async = _wrap_beat_scheduler(RedBeatScheduler.apply_async)
  168. def _setup_celery_beat_signals(monitor_beat_tasks: bool) -> None:
  169. if monitor_beat_tasks:
  170. task_success.connect(crons_task_success)
  171. task_failure.connect(crons_task_failure)
  172. task_retry.connect(crons_task_retry)
  173. def crons_task_success(sender: "Task", **kwargs: "dict[Any, Any]") -> None:
  174. logger.debug("celery_task_success %s", sender)
  175. headers = _get_headers(sender)
  176. if "sentry-monitor-slug" not in headers:
  177. return
  178. monitor_config = headers.get("sentry-monitor-config", {})
  179. start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
  180. capture_checkin(
  181. monitor_slug=headers["sentry-monitor-slug"],
  182. monitor_config=monitor_config,
  183. check_in_id=headers["sentry-monitor-check-in-id"],
  184. duration=(
  185. _now_seconds_since_epoch() - float(start_timestamp_s)
  186. if start_timestamp_s
  187. else None
  188. ),
  189. status=MonitorStatus.OK,
  190. )
  191. def crons_task_failure(sender: "Task", **kwargs: "dict[Any, Any]") -> None:
  192. logger.debug("celery_task_failure %s", sender)
  193. headers = _get_headers(sender)
  194. if "sentry-monitor-slug" not in headers:
  195. return
  196. monitor_config = headers.get("sentry-monitor-config", {})
  197. start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
  198. capture_checkin(
  199. monitor_slug=headers["sentry-monitor-slug"],
  200. monitor_config=monitor_config,
  201. check_in_id=headers["sentry-monitor-check-in-id"],
  202. duration=(
  203. _now_seconds_since_epoch() - float(start_timestamp_s)
  204. if start_timestamp_s
  205. else None
  206. ),
  207. status=MonitorStatus.ERROR,
  208. )
  209. def crons_task_retry(sender: "Task", **kwargs: "dict[Any, Any]") -> None:
  210. logger.debug("celery_task_retry %s", sender)
  211. headers = _get_headers(sender)
  212. if "sentry-monitor-slug" not in headers:
  213. return
  214. monitor_config = headers.get("sentry-monitor-config", {})
  215. start_timestamp_s = headers.get("sentry-monitor-start-timestamp-s")
  216. capture_checkin(
  217. monitor_slug=headers["sentry-monitor-slug"],
  218. monitor_config=monitor_config,
  219. check_in_id=headers["sentry-monitor-check-in-id"],
  220. duration=(
  221. _now_seconds_since_epoch() - float(start_timestamp_s)
  222. if start_timestamp_s
  223. else None
  224. ),
  225. status=MonitorStatus.ERROR,
  226. )