event_head.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. import asyncio
  2. import logging
  3. import os
  4. import time
  5. from collections import OrderedDict, defaultdict
  6. from concurrent.futures import ThreadPoolExecutor
  7. from datetime import datetime
  8. from itertools import islice
  9. from typing import Dict, List, Union
  10. import aiohttp.web
  11. import ray
  12. import ray.dashboard.optional_utils as dashboard_optional_utils
  13. import ray.dashboard.utils as dashboard_utils
  14. from ray._common.usage.usage_lib import TagKey, record_extra_usage_tag
  15. from ray._common.utils import get_or_create_event_loop
  16. from ray._private.ray_constants import env_integer
  17. from ray.dashboard.consts import (
  18. RAY_STATE_SERVER_MAX_HTTP_REQUEST,
  19. RAY_STATE_SERVER_MAX_HTTP_REQUEST_ALLOWED,
  20. RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME,
  21. )
  22. from ray.dashboard.modules.event.event_utils import monitor_events, parse_event_strings
  23. from ray.dashboard.state_api_utils import do_filter, handle_list_api
  24. from ray.dashboard.subprocesses.module import SubprocessModule
  25. from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
  26. from ray.util.state.common import ClusterEventState, ListApiOptions, ListApiResponse
  27. logger = logging.getLogger(__name__)
  28. JobEvents = OrderedDict
  29. dashboard_utils._json_compatible_types.add(JobEvents)
  30. MAX_EVENTS_TO_CACHE = int(os.environ.get("RAY_DASHBOARD_MAX_EVENTS_TO_CACHE", 10000))
  31. # NOTE: Executor in this head is intentionally constrained to just 1 thread by
  32. # default to limit its concurrency, therefore reducing potential for
  33. # GIL contention
  34. RAY_DASHBOARD_EVENT_HEAD_TPE_MAX_WORKERS = env_integer(
  35. "RAY_DASHBOARD_EVENT_HEAD_TPE_MAX_WORKERS", 1
  36. )
  37. async def _list_cluster_events_impl(
  38. *, all_events, executor: ThreadPoolExecutor, option: ListApiOptions
  39. ) -> ListApiResponse:
  40. """
  41. List all cluster events from the cluster. Made a free function to allow unit tests.
  42. Returns:
  43. A list of cluster events in the cluster.
  44. The schema of returned "dict" is equivalent to the
  45. `ClusterEventState` protobuf message.
  46. """
  47. def transform(all_events) -> ListApiResponse:
  48. result = []
  49. for _, events in all_events.items():
  50. for _, event in events.items():
  51. event["time"] = str(datetime.fromtimestamp(int(event["timestamp"])))
  52. result.append(event)
  53. num_after_truncation = len(result)
  54. result.sort(key=lambda entry: entry["timestamp"])
  55. total = len(result)
  56. result = do_filter(result, option.filters, ClusterEventState, option.detail)
  57. num_filtered = len(result)
  58. # Sort to make the output deterministic.
  59. result = list(islice(result, option.limit))
  60. return ListApiResponse(
  61. result=result,
  62. total=total,
  63. num_after_truncation=num_after_truncation,
  64. num_filtered=num_filtered,
  65. )
  66. return await get_or_create_event_loop().run_in_executor(
  67. executor, transform, all_events
  68. )
  69. class EventHead(
  70. SubprocessModule,
  71. dashboard_utils.RateLimitedModule,
  72. ):
  73. def __init__(self, *args, **kwargs):
  74. SubprocessModule.__init__(self, *args, **kwargs)
  75. dashboard_utils.RateLimitedModule.__init__(
  76. self,
  77. min(
  78. RAY_STATE_SERVER_MAX_HTTP_REQUEST,
  79. RAY_STATE_SERVER_MAX_HTTP_REQUEST_ALLOWED,
  80. ),
  81. )
  82. self._event_dir = os.path.join(self.log_dir, "events")
  83. os.makedirs(self._event_dir, exist_ok=True)
  84. self._monitor: Union[asyncio.Task, None] = None
  85. self.total_report_events_count = 0
  86. self.total_events_received = 0
  87. self.module_started = time.monotonic()
  88. # {job_id hex(str): {event_id (str): event (dict)}}
  89. self.events: Dict[str, JobEvents] = defaultdict(JobEvents)
  90. self._executor = ThreadPoolExecutor(
  91. max_workers=RAY_DASHBOARD_EVENT_HEAD_TPE_MAX_WORKERS,
  92. thread_name_prefix="event_head_executor",
  93. )
  94. # To init gcs_client in internal_kv for record_extra_usage_tag.
  95. assert self.gcs_client is not None
  96. assert ray.experimental.internal_kv._internal_kv_initialized()
  97. async def limit_handler_(self):
  98. return dashboard_optional_utils.rest_response(
  99. status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
  100. error_message=(
  101. "Max number of in-progress requests="
  102. f"{self.max_num_call_} reached. "
  103. "To set a higher limit, set environment variable: "
  104. f"export {RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME}='xxx'. "
  105. f"Max allowed = {RAY_STATE_SERVER_MAX_HTTP_REQUEST_ALLOWED}"
  106. ),
  107. result=None,
  108. )
  109. def _update_events(self, event_list):
  110. # {job_id: {event_id: event}}
  111. all_job_events = defaultdict(JobEvents)
  112. for event in event_list:
  113. event_id = event["event_id"]
  114. custom_fields = event.get("custom_fields")
  115. system_event = False
  116. if custom_fields:
  117. job_id = custom_fields.get("job_id", "global") or "global"
  118. else:
  119. job_id = "global"
  120. if system_event is False:
  121. all_job_events[job_id][event_id] = event
  122. for job_id, new_job_events in all_job_events.items():
  123. job_events = self.events[job_id]
  124. job_events.update(new_job_events)
  125. # Limit the # of events cached if it exceeds the threshold.
  126. if len(job_events) > MAX_EVENTS_TO_CACHE * 1.1:
  127. while len(job_events) > MAX_EVENTS_TO_CACHE:
  128. job_events.popitem(last=False)
  129. @routes.post("/report_events")
  130. async def report_events(self, request):
  131. """
  132. Report events to the dashboard.
  133. The request body is a JSON array of event strings in type string.
  134. Response should contain {"success": true}.
  135. """
  136. try:
  137. request_body: List[str] = await request.json()
  138. except Exception as e:
  139. logger.warning(f"Failed to parse request body: {request=}, {e=}")
  140. raise aiohttp.web.HTTPBadRequest()
  141. if not isinstance(request_body, list):
  142. logger.warning(f"Request body is not a list, {request_body=}")
  143. raise aiohttp.web.HTTPBadRequest()
  144. events = parse_event_strings(request_body)
  145. logger.debug("Received %d events", len(events))
  146. self._update_events(events)
  147. self.total_report_events_count += 1
  148. self.total_events_received += len(events)
  149. return dashboard_optional_utils.rest_response(
  150. success=True,
  151. message="",
  152. status_code=dashboard_utils.HTTPStatusCode.OK,
  153. )
  154. async def _periodic_state_print(self):
  155. if self.total_events_received <= 0 or self.total_report_events_count <= 0:
  156. return
  157. elapsed = time.monotonic() - self.module_started
  158. return {
  159. "total_events_received": self.total_events_received,
  160. "Total_requests_received": self.total_report_events_count,
  161. "total_uptime": elapsed,
  162. }
  163. @routes.get("/events")
  164. @dashboard_optional_utils.aiohttp_cache
  165. async def get_event(self, req) -> aiohttp.web.Response:
  166. job_id = req.query.get("job_id")
  167. if job_id is None:
  168. all_events = {
  169. job_id: list(job_events.values())
  170. for job_id, job_events in self.events.items()
  171. }
  172. return dashboard_optional_utils.rest_response(
  173. status_code=dashboard_utils.HTTPStatusCode.OK,
  174. message="All events fetched.",
  175. events=all_events,
  176. )
  177. job_events = self.events[job_id]
  178. return dashboard_optional_utils.rest_response(
  179. status_code=dashboard_utils.HTTPStatusCode.OK,
  180. message="Job events fetched.",
  181. job_id=job_id,
  182. events=list(job_events.values()),
  183. )
  184. @routes.get("/api/v0/cluster_events")
  185. @dashboard_utils.RateLimitedModule.enforce_max_concurrent_calls
  186. async def list_cluster_events(
  187. self, req: aiohttp.web.Request
  188. ) -> aiohttp.web.Response:
  189. record_extra_usage_tag(TagKey.CORE_STATE_API_LIST_CLUSTER_EVENTS, "1")
  190. async def list_api_fn(option: ListApiOptions):
  191. return await _list_cluster_events_impl(
  192. all_events=self.events, executor=self._executor, option=option
  193. )
  194. return await handle_list_api(list_api_fn, req)
  195. async def run(self):
  196. await super().run()
  197. self._monitor = monitor_events(
  198. self._event_dir,
  199. lambda data: self._update_events(parse_event_strings(data)),
  200. self._executor,
  201. )