event_agent.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import asyncio
  2. import logging
  3. import os
  4. import time
  5. from concurrent.futures import ThreadPoolExecutor
  6. from typing import Union
  7. import ray._private.ray_constants as ray_constants
  8. import ray.dashboard.consts as dashboard_consts
  9. import ray.dashboard.utils as dashboard_utils
  10. from ray._private.authentication.http_token_authentication import (
  11. get_auth_headers_if_auth_enabled,
  12. )
  13. from ray.dashboard.modules.event import event_consts
  14. from ray.dashboard.modules.event.event_utils import monitor_events
  15. from ray.dashboard.utils import async_loop_forever, create_task
  16. logger = logging.getLogger(__name__)
  17. # NOTE: Executor in this head is intentionally constrained to just 1 thread by
  18. # default to limit its concurrency, therefore reducing potential for
  19. # GIL contention
  20. RAY_DASHBOARD_EVENT_AGENT_TPE_MAX_WORKERS = ray_constants.env_integer(
  21. "RAY_DASHBOARD_EVENT_AGENT_TPE_MAX_WORKERS", 1
  22. )
  23. class EventAgent(dashboard_utils.DashboardAgentModule):
  24. def __init__(self, dashboard_agent):
  25. super().__init__(dashboard_agent)
  26. self._event_dir = os.path.join(self._dashboard_agent.log_dir, "events")
  27. os.makedirs(self._event_dir, exist_ok=True)
  28. self._monitor: Union[asyncio.Task, None] = None
  29. # Lazy initialized on first use. Once initialized, it will not be
  30. # changed.
  31. self._dashboard_http_address = None
  32. self._cached_events = asyncio.Queue(event_consts.EVENT_AGENT_CACHE_SIZE)
  33. self._gcs_client = dashboard_agent.gcs_client
  34. # Total number of event created from this agent.
  35. self.total_event_reported = 0
  36. # Total number of event report request sent.
  37. self.total_request_sent = 0
  38. self.module_started = time.monotonic()
  39. self._executor = ThreadPoolExecutor(
  40. max_workers=RAY_DASHBOARD_EVENT_AGENT_TPE_MAX_WORKERS,
  41. thread_name_prefix="event_agent_executor",
  42. )
  43. logger.info("Event agent cache buffer size: %s", self._cached_events.maxsize)
  44. async def _get_dashboard_http_address(self):
  45. """
  46. Lazily get the dashboard http address from InternalKV. If it's not set, sleep
  47. and retry forever.
  48. """
  49. while True:
  50. if self._dashboard_http_address:
  51. return self._dashboard_http_address
  52. try:
  53. dashboard_http_address = await self._gcs_client.async_internal_kv_get(
  54. ray_constants.DASHBOARD_ADDRESS.encode(),
  55. namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
  56. timeout=dashboard_consts.GCS_RPC_TIMEOUT_SECONDS,
  57. )
  58. if not dashboard_http_address:
  59. raise ValueError("Dashboard http address not found in InternalKV.")
  60. self._dashboard_http_address = dashboard_http_address.decode()
  61. return self._dashboard_http_address
  62. except Exception:
  63. logger.exception("Get dashboard http address failed.")
  64. await asyncio.sleep(1)
  65. @async_loop_forever(event_consts.EVENT_AGENT_REPORT_INTERVAL_SECONDS)
  66. async def report_events(self):
  67. """Report events from cached events queue. Reconnect to dashboard if
  68. report failed. Log error after retry EVENT_AGENT_RETRY_TIMES.
  69. This method will never returns.
  70. """
  71. dashboard_http_address = await self._get_dashboard_http_address()
  72. data = await self._cached_events.get()
  73. self.total_event_reported += len(data)
  74. last_exception = None
  75. for _ in range(event_consts.EVENT_AGENT_RETRY_TIMES):
  76. try:
  77. logger.debug("Report %s events.", len(data))
  78. async with self._dashboard_agent.http_session.post(
  79. f"{dashboard_http_address}/report_events",
  80. json=data,
  81. headers=get_auth_headers_if_auth_enabled({}),
  82. ) as response:
  83. response.raise_for_status()
  84. self.total_request_sent += 1
  85. break
  86. except Exception as e:
  87. logger.warning(f"Report event failed, retrying... {e}")
  88. last_exception = e
  89. else:
  90. data_str = str(data)
  91. limit = event_consts.LOG_ERROR_EVENT_STRING_LENGTH_LIMIT
  92. logger.error(
  93. "Report event failed: %s",
  94. data_str[:limit] + (data_str[limit:] and "..."),
  95. exc_info=last_exception,
  96. )
  97. async def get_internal_states(self):
  98. if self.total_event_reported <= 0 or self.total_request_sent <= 0:
  99. return
  100. elapsed = time.monotonic() - self.module_started
  101. return {
  102. "total_events_reported": self.total_event_reported,
  103. "Total_report_request": self.total_request_sent,
  104. "queue_size": self._cached_events.qsize(),
  105. "total_uptime": elapsed,
  106. }
  107. async def run(self, server):
  108. # Start monitor task.
  109. self._monitor = monitor_events(
  110. self._event_dir,
  111. lambda data: create_task(self._cached_events.put(data)),
  112. self._executor,
  113. )
  114. await asyncio.gather(
  115. self.report_events(),
  116. )
  117. @staticmethod
  118. def is_minimal_module():
  119. return False