| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- import asyncio
- import logging
- import os
- import time
- from collections import OrderedDict, defaultdict
- from concurrent.futures import ThreadPoolExecutor
- from datetime import datetime
- from itertools import islice
- from typing import Dict, List, Union
- import aiohttp.web
- import ray
- import ray.dashboard.optional_utils as dashboard_optional_utils
- import ray.dashboard.utils as dashboard_utils
- from ray._common.usage.usage_lib import TagKey, record_extra_usage_tag
- from ray._common.utils import get_or_create_event_loop
- from ray._private.ray_constants import env_integer
- from ray.dashboard.consts import (
- RAY_STATE_SERVER_MAX_HTTP_REQUEST,
- RAY_STATE_SERVER_MAX_HTTP_REQUEST_ALLOWED,
- RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME,
- )
- from ray.dashboard.modules.event.event_utils import monitor_events, parse_event_strings
- from ray.dashboard.state_api_utils import do_filter, handle_list_api
- from ray.dashboard.subprocesses.module import SubprocessModule
- from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
- from ray.util.state.common import ClusterEventState, ListApiOptions, ListApiResponse
- logger = logging.getLogger(__name__)
- JobEvents = OrderedDict
- dashboard_utils._json_compatible_types.add(JobEvents)
- MAX_EVENTS_TO_CACHE = int(os.environ.get("RAY_DASHBOARD_MAX_EVENTS_TO_CACHE", 10000))
- # NOTE: Executor in this head is intentionally constrained to just 1 thread by
- # default to limit its concurrency, therefore reducing potential for
- # GIL contention
- RAY_DASHBOARD_EVENT_HEAD_TPE_MAX_WORKERS = env_integer(
- "RAY_DASHBOARD_EVENT_HEAD_TPE_MAX_WORKERS", 1
- )
- async def _list_cluster_events_impl(
- *, all_events, executor: ThreadPoolExecutor, option: ListApiOptions
- ) -> ListApiResponse:
- """
- List all cluster events from the cluster. Made a free function to allow unit tests.
- Returns:
- A list of cluster events in the cluster.
- The schema of returned "dict" is equivalent to the
- `ClusterEventState` protobuf message.
- """
- def transform(all_events) -> ListApiResponse:
- result = []
- for _, events in all_events.items():
- for _, event in events.items():
- event["time"] = str(datetime.fromtimestamp(int(event["timestamp"])))
- result.append(event)
- num_after_truncation = len(result)
- result.sort(key=lambda entry: entry["timestamp"])
- total = len(result)
- result = do_filter(result, option.filters, ClusterEventState, option.detail)
- num_filtered = len(result)
- # Sort to make the output deterministic.
- result = list(islice(result, option.limit))
- return ListApiResponse(
- result=result,
- total=total,
- num_after_truncation=num_after_truncation,
- num_filtered=num_filtered,
- )
- return await get_or_create_event_loop().run_in_executor(
- executor, transform, all_events
- )
- class EventHead(
- SubprocessModule,
- dashboard_utils.RateLimitedModule,
- ):
- def __init__(self, *args, **kwargs):
- SubprocessModule.__init__(self, *args, **kwargs)
- dashboard_utils.RateLimitedModule.__init__(
- self,
- min(
- RAY_STATE_SERVER_MAX_HTTP_REQUEST,
- RAY_STATE_SERVER_MAX_HTTP_REQUEST_ALLOWED,
- ),
- )
- self._event_dir = os.path.join(self.log_dir, "events")
- os.makedirs(self._event_dir, exist_ok=True)
- self._monitor: Union[asyncio.Task, None] = None
- self.total_report_events_count = 0
- self.total_events_received = 0
- self.module_started = time.monotonic()
- # {job_id hex(str): {event_id (str): event (dict)}}
- self.events: Dict[str, JobEvents] = defaultdict(JobEvents)
- self._executor = ThreadPoolExecutor(
- max_workers=RAY_DASHBOARD_EVENT_HEAD_TPE_MAX_WORKERS,
- thread_name_prefix="event_head_executor",
- )
- # To init gcs_client in internal_kv for record_extra_usage_tag.
- assert self.gcs_client is not None
- assert ray.experimental.internal_kv._internal_kv_initialized()
- async def limit_handler_(self):
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
- error_message=(
- "Max number of in-progress requests="
- f"{self.max_num_call_} reached. "
- "To set a higher limit, set environment variable: "
- f"export {RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME}='xxx'. "
- f"Max allowed = {RAY_STATE_SERVER_MAX_HTTP_REQUEST_ALLOWED}"
- ),
- result=None,
- )
- def _update_events(self, event_list):
- # {job_id: {event_id: event}}
- all_job_events = defaultdict(JobEvents)
- for event in event_list:
- event_id = event["event_id"]
- custom_fields = event.get("custom_fields")
- system_event = False
- if custom_fields:
- job_id = custom_fields.get("job_id", "global") or "global"
- else:
- job_id = "global"
- if system_event is False:
- all_job_events[job_id][event_id] = event
- for job_id, new_job_events in all_job_events.items():
- job_events = self.events[job_id]
- job_events.update(new_job_events)
- # Limit the # of events cached if it exceeds the threshold.
- if len(job_events) > MAX_EVENTS_TO_CACHE * 1.1:
- while len(job_events) > MAX_EVENTS_TO_CACHE:
- job_events.popitem(last=False)
- @routes.post("/report_events")
- async def report_events(self, request):
- """
- Report events to the dashboard.
- The request body is a JSON array of event strings in type string.
- Response should contain {"success": true}.
- """
- try:
- request_body: List[str] = await request.json()
- except Exception as e:
- logger.warning(f"Failed to parse request body: {request=}, {e=}")
- raise aiohttp.web.HTTPBadRequest()
- if not isinstance(request_body, list):
- logger.warning(f"Request body is not a list, {request_body=}")
- raise aiohttp.web.HTTPBadRequest()
- events = parse_event_strings(request_body)
- logger.debug("Received %d events", len(events))
- self._update_events(events)
- self.total_report_events_count += 1
- self.total_events_received += len(events)
- return dashboard_optional_utils.rest_response(
- success=True,
- message="",
- status_code=dashboard_utils.HTTPStatusCode.OK,
- )
- async def _periodic_state_print(self):
- if self.total_events_received <= 0 or self.total_report_events_count <= 0:
- return
- elapsed = time.monotonic() - self.module_started
- return {
- "total_events_received": self.total_events_received,
- "Total_requests_received": self.total_report_events_count,
- "total_uptime": elapsed,
- }
- @routes.get("/events")
- @dashboard_optional_utils.aiohttp_cache
- async def get_event(self, req) -> aiohttp.web.Response:
- job_id = req.query.get("job_id")
- if job_id is None:
- all_events = {
- job_id: list(job_events.values())
- for job_id, job_events in self.events.items()
- }
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.OK,
- message="All events fetched.",
- events=all_events,
- )
- job_events = self.events[job_id]
- return dashboard_optional_utils.rest_response(
- status_code=dashboard_utils.HTTPStatusCode.OK,
- message="Job events fetched.",
- job_id=job_id,
- events=list(job_events.values()),
- )
- @routes.get("/api/v0/cluster_events")
- @dashboard_utils.RateLimitedModule.enforce_max_concurrent_calls
- async def list_cluster_events(
- self, req: aiohttp.web.Request
- ) -> aiohttp.web.Response:
- record_extra_usage_tag(TagKey.CORE_STATE_API_LIST_CLUSTER_EVENTS, "1")
- async def list_api_fn(option: ListApiOptions):
- return await _list_cluster_events_impl(
- all_events=self.events, executor=self._executor, option=option
- )
- return await handle_list_api(list_api_fn, req)
- async def run(self):
- await super().run()
- self._monitor = monitor_events(
- self._event_dir,
- lambda data: self._update_events(parse_event_strings(data)),
- self._executor,
- )
|