tasks.py 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from functools import wraps
  2. import sentry_sdk
  3. from sentry_sdk.consts import OP
  4. from sentry_sdk.utils import qualname_from_function
  5. try:
  6. # django.tasks were added in Django 6.0
  7. from django.tasks.base import Task
  8. except ImportError:
  9. Task = None
  10. from typing import TYPE_CHECKING
  11. if TYPE_CHECKING:
  12. from typing import Any
  13. def patch_tasks() -> None:
  14. if Task is None:
  15. return
  16. old_task_enqueue = Task.enqueue
  17. @wraps(old_task_enqueue)
  18. def _sentry_enqueue(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
  19. from sentry_sdk.integrations.django import DjangoIntegration
  20. integration = sentry_sdk.get_client().get_integration(DjangoIntegration)
  21. if integration is None:
  22. return old_task_enqueue(self, *args, **kwargs)
  23. name = qualname_from_function(self.func) or "<unknown Django task>"
  24. with sentry_sdk.start_span(
  25. op=OP.QUEUE_SUBMIT_DJANGO, name=name, origin=DjangoIntegration.origin
  26. ):
  27. return old_task_enqueue(self, *args, **kwargs)
  28. Task.enqueue = _sentry_enqueue