| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145 |
- import json
- import logging
- import sys
- from collections import defaultdict
- from threading import Lock
- from typing import Dict, Optional
- import ray
- from ray._common.constants import HEAD_NODE_RESOURCE_NAME, NODE_ID_PREFIX
- from ray._common.utils import binary_to_hex, decode, hex_to_binary
- from ray._private.client_mode_hook import client_mode_hook
- from ray._private.protobuf_compat import message_to_dict
- from ray._private.utils import (
- validate_actor_state_name,
- )
- from ray._raylet import GlobalStateAccessor
- from ray.core.generated import autoscaler_pb2, common_pb2, gcs_pb2
- from ray.util.annotations import DeveloperAPI
- logger = logging.getLogger(__name__)
- class GlobalState:
- """A class used to interface with the Ray control state.
- Attributes:
- global_state_accessor: The client used to query gcs table from gcs
- server.
- """
- def __init__(self):
- """Create a GlobalState object."""
- # Args used for lazy init of this object.
- self.gcs_options = None
- self._global_state_accessor = None
- self._init_lock = Lock()
- def _connect_and_get_accessor(self) -> GlobalStateAccessor:
- """
- This lazily initializes clients needed for state accessors and returns a connected global state accessor.
- Returns:
- GlobalStateAccessor: A connected global state accessor.
- Raises:
- RuntimeError: An exception is raised if ray.init() has not been called yet.
- """
- with self._init_lock:
- if self._global_state_accessor is not None:
- return self._global_state_accessor
- if self.gcs_options is None:
- raise ray.exceptions.RaySystemError(
- "Ray has not been started yet. Trying to use state API before ray.init() has been called."
- )
- self._global_state_accessor = GlobalStateAccessor(self.gcs_options)
- connected = self._global_state_accessor.connect()
- if not connected:
- self._global_state_accessor = None
- raise ray.exceptions.RaySystemError(
- "Failed to connect to GCS. Please check if the GCS server is running "
- "and if this node can connect to the head node."
- )
- return self._global_state_accessor
- def disconnect(self):
- """Disconnect global state from GCS."""
- with self._init_lock:
- self.gcs_options = None
- if self._global_state_accessor is not None:
- self._global_state_accessor = None
- def _initialize_global_state(self, gcs_options):
- """Set args for lazily initialization of the GlobalState object.
- It's possible that certain keys in gcs kv may not have been fully
- populated yet. In this case, we will retry this method until they have
- been populated or we exceed a timeout.
- Args:
- gcs_options: The client options for gcs
- """
- # Save args for lazy init of global state. This avoids opening extra
- # gcs connections from each worker until needed.
- with self._init_lock:
- self.gcs_options = gcs_options
- def actor_table(
- self,
- actor_id: Optional[str],
- job_id: Optional[ray.JobID] = None,
- actor_state_name: Optional[str] = None,
- ):
- """Fetch and parse the actor table information for a single actor ID.
- Args:
- actor_id: A hex string of the actor ID to fetch information about.
- If this is None, then the actor table is fetched.
- If this is not None, `job_id` and `actor_state_name`
- will not take effect.
- job_id: To filter actors by job_id, which is of type `ray.JobID`.
- You can use the `ray.get_runtime_context().job_id` function
- to get the current job ID
- actor_state_name: To filter actors based on actor state,
- which can be one of the following: "DEPENDENCIES_UNREADY",
- "PENDING_CREATION", "ALIVE", "RESTARTING", or "DEAD".
- Returns:
- Information from the actor table.
- """
- accessor = self._connect_and_get_accessor()
- if actor_id is not None:
- actor_id = ray.ActorID(hex_to_binary(actor_id))
- actor_info = accessor.get_actor_info(actor_id)
- if actor_info is None:
- return {}
- else:
- actor_table_data = gcs_pb2.ActorTableData.FromString(actor_info)
- return self._gen_actor_info(actor_table_data)
- else:
- validate_actor_state_name(actor_state_name)
- actor_table = accessor.get_actor_table(job_id, actor_state_name)
- results = {}
- for i in range(len(actor_table)):
- actor_table_data = gcs_pb2.ActorTableData.FromString(actor_table[i])
- results[
- binary_to_hex(actor_table_data.actor_id)
- ] = self._gen_actor_info(actor_table_data)
- return results
- def _gen_actor_info(self, actor_table_data):
- """Parse actor table data.
- Returns:
- Information from actor table.
- """
- actor_info = {
- "ActorID": binary_to_hex(actor_table_data.actor_id),
- "ActorClassName": actor_table_data.class_name,
- "IsDetached": actor_table_data.is_detached,
- "Name": actor_table_data.name,
- "JobID": binary_to_hex(actor_table_data.job_id),
- "Address": {
- "IPAddress": actor_table_data.address.ip_address,
- "Port": actor_table_data.address.port,
- "NodeID": binary_to_hex(actor_table_data.address.node_id),
- },
- "OwnerAddress": {
- "IPAddress": actor_table_data.owner_address.ip_address,
- "Port": actor_table_data.owner_address.port,
- "NodeID": binary_to_hex(actor_table_data.owner_address.node_id),
- },
- "State": gcs_pb2.ActorTableData.ActorState.DESCRIPTOR.values_by_number[
- actor_table_data.state
- ].name,
- "NumRestarts": actor_table_data.num_restarts,
- "Timestamp": actor_table_data.timestamp,
- "StartTime": actor_table_data.start_time,
- "EndTime": actor_table_data.end_time,
- "DeathCause": actor_table_data.death_cause,
- "Pid": actor_table_data.pid,
- }
- return actor_info
- def node_table(self):
- """Fetch and parse the Gcs node info table.
- Returns:
- Information about the node in the cluster.
- """
- accessor = self._connect_and_get_accessor()
- return accessor.get_node_table()
- def job_table(self):
- """Fetch and parse the gcs job table.
- Returns:
- Information about the Ray jobs in the cluster,
- namely a list of dicts with keys:
- - "JobID" (identifier for the job),
- - "DriverIPAddress" (IP address of the driver for this job),
- - "DriverPid" (process ID of the driver for this job),
- - "StartTime" (UNIX timestamp of the start time of this job),
- - "StopTime" (UNIX timestamp of the stop time of this job, if any)
- """
- accessor = self._connect_and_get_accessor()
- job_table = accessor.get_job_table(
- skip_submission_job_info_field=True, skip_is_running_tasks_field=True
- )
- results = []
- for i in range(len(job_table)):
- entry = gcs_pb2.JobTableData.FromString(job_table[i])
- job_info = {}
- job_info["JobID"] = entry.job_id.hex()
- job_info["DriverIPAddress"] = entry.driver_address.ip_address
- job_info["DriverPid"] = entry.driver_pid
- job_info["Timestamp"] = entry.timestamp
- job_info["StartTime"] = entry.start_time
- job_info["EndTime"] = entry.end_time
- job_info["IsDead"] = entry.is_dead
- job_info["Entrypoint"] = entry.entrypoint
- results.append(job_info)
- return results
- def next_job_id(self):
- """Get next job id from GCS.
- Returns:
- Next job id in the cluster.
- """
- accessor = self._connect_and_get_accessor()
- return ray.JobID.from_int(accessor.get_next_job_id())
- def profile_events(self):
- """Retrieve and return task profiling events from GCS.
- Return:
- Profiling events by component id (e.g. worker id).
- {
- <component_id>: [
- {
- event_type: <event name> ,
- component_id: <i.e. worker id>,
- node_ip_address: <on which node profiling was done>,
- component_type: <i.e. worker/driver>,
- start_time: <unix timestamp in seconds>,
- end_time: <unix timestamp in seconds>,
- extra_data: <e.g. stack trace when error raised>,
- }
- ]
- }
- """
- accessor = self._connect_and_get_accessor()
- result = defaultdict(list)
- task_events = accessor.get_task_events()
- for i in range(len(task_events)):
- event = gcs_pb2.TaskEvents.FromString(task_events[i])
- profile = event.profile_events
- if not profile:
- continue
- component_type = profile.component_type
- component_id = binary_to_hex(profile.component_id)
- node_ip_address = profile.node_ip_address
- for event in profile.events:
- try:
- extra_data = json.loads(event.extra_data)
- except ValueError:
- extra_data = {}
- profile_event = {
- "event_type": event.event_name,
- "component_id": component_id,
- "node_ip_address": node_ip_address,
- "component_type": component_type,
- "start_time": event.start_time,
- "end_time": event.end_time,
- "extra_data": extra_data,
- }
- result[component_id].append(profile_event)
- return dict(result)
- def get_placement_group_by_name(self, placement_group_name, ray_namespace):
- accessor = self._connect_and_get_accessor()
- placement_group_info = accessor.get_placement_group_by_name(
- placement_group_name, ray_namespace
- )
- if placement_group_info is None:
- return None
- else:
- placement_group_table_data = gcs_pb2.PlacementGroupTableData.FromString(
- placement_group_info
- )
- return self._gen_placement_group_info(placement_group_table_data)
- def placement_group_table(self, placement_group_id=None):
- accessor = self._connect_and_get_accessor()
- if placement_group_id is not None:
- placement_group_id = ray.PlacementGroupID(
- hex_to_binary(placement_group_id.hex())
- )
- placement_group_info = accessor.get_placement_group_info(placement_group_id)
- if placement_group_info is None:
- return {}
- else:
- placement_group_info = gcs_pb2.PlacementGroupTableData.FromString(
- placement_group_info
- )
- return self._gen_placement_group_info(placement_group_info)
- else:
- placement_group_table = accessor.get_placement_group_table()
- results = {}
- for placement_group_info in placement_group_table:
- placement_group_table_data = gcs_pb2.PlacementGroupTableData.FromString(
- placement_group_info
- )
- placement_group_id = binary_to_hex(
- placement_group_table_data.placement_group_id
- )
- results[placement_group_id] = self._gen_placement_group_info(
- placement_group_table_data
- )
- return results
- def _gen_placement_group_info(self, placement_group_info):
- # This should be imported here, otherwise, it will error doc build.
- from ray.core.generated.common_pb2 import PlacementStrategy
- def get_state(state):
- if state == gcs_pb2.PlacementGroupTableData.PENDING:
- return "PENDING"
- elif state == gcs_pb2.PlacementGroupTableData.PREPARED:
- return "PREPARED"
- elif state == gcs_pb2.PlacementGroupTableData.CREATED:
- return "CREATED"
- elif state == gcs_pb2.PlacementGroupTableData.RESCHEDULING:
- return "RESCHEDULING"
- else:
- return "REMOVED"
- def get_strategy(strategy):
- if strategy == PlacementStrategy.PACK:
- return "PACK"
- elif strategy == PlacementStrategy.STRICT_PACK:
- return "STRICT_PACK"
- elif strategy == PlacementStrategy.STRICT_SPREAD:
- return "STRICT_SPREAD"
- elif strategy == PlacementStrategy.SPREAD:
- return "SPREAD"
- else:
- raise ValueError(f"Invalid strategy returned: {PlacementStrategy}")
- stats = placement_group_info.stats
- assert placement_group_info is not None
- return {
- "placement_group_id": binary_to_hex(
- placement_group_info.placement_group_id
- ),
- "name": placement_group_info.name,
- "bundles": {
- # The value here is needs to be dictionarified
- # otherwise, the payload becomes unserializable.
- bundle.bundle_id.bundle_index: message_to_dict(bundle)["unitResources"]
- for bundle in placement_group_info.bundles
- },
- "bundles_to_node_id": {
- bundle.bundle_id.bundle_index: binary_to_hex(bundle.node_id)
- for bundle in placement_group_info.bundles
- },
- "strategy": get_strategy(placement_group_info.strategy),
- "state": get_state(placement_group_info.state),
- "stats": {
- "end_to_end_creation_latency_ms": (
- stats.end_to_end_creation_latency_us / 1000.0
- ),
- "scheduling_latency_ms": (stats.scheduling_latency_us / 1000.0),
- "scheduling_attempt": stats.scheduling_attempt,
- "highest_retry_delay_ms": stats.highest_retry_delay_ms,
- "scheduling_state": gcs_pb2.PlacementGroupStats.SchedulingState.DESCRIPTOR.values_by_number[ # noqa: E501
- stats.scheduling_state
- ].name,
- },
- }
- def _nanoseconds_to_microseconds(self, time_in_nanoseconds):
- """A helper function for converting nanoseconds to microseconds."""
- time_in_microseconds = time_in_nanoseconds / 1000
- return time_in_microseconds
- # Colors are specified at
- # https://github.com/catapult-project/catapult/blob/master/tracing/tracing/base/color_scheme.html. # noqa: E501
- _default_color_mapping = defaultdict(
- lambda: "generic_work",
- {
- "worker_idle": "cq_build_abandoned",
- "task": "rail_response",
- "task:deserialize_arguments": "rail_load",
- "task:execute": "rail_animation",
- "task:store_outputs": "rail_idle",
- "wait_for_function": "detailed_memory_dump",
- "ray.get": "good",
- "ray.put": "terrible",
- "ray.wait": "vsync_highlight_color",
- "submit_task": "background_memory_dump",
- "fetch_and_run_function": "detailed_memory_dump",
- "register_remote_function": "detailed_memory_dump",
- },
- )
- # These colors are for use in Chrome tracing.
- _chrome_tracing_colors = [
- "thread_state_uninterruptible",
- "thread_state_iowait",
- "thread_state_running",
- "thread_state_runnable",
- "thread_state_sleeping",
- "thread_state_unknown",
- "background_memory_dump",
- "light_memory_dump",
- "detailed_memory_dump",
- "vsync_highlight_color",
- "generic_work",
- "good",
- "bad",
- "terrible",
- # "black",
- # "grey",
- # "white",
- "yellow",
- "olive",
- "rail_response",
- "rail_animation",
- "rail_idle",
- "rail_load",
- "startup",
- "heap_dump_stack_frame",
- "heap_dump_object_type",
- "heap_dump_child_node_arrow",
- "cq_build_running",
- "cq_build_passed",
- "cq_build_failed",
- "cq_build_abandoned",
- "cq_build_attempt_runnig",
- "cq_build_attempt_passed",
- "cq_build_attempt_failed",
- ]
- def chrome_tracing_dump(self, filename=None):
- """Return a list of profiling events that can viewed as a timeline.
- To view this information as a timeline, simply dump it as a json file
- by passing in "filename" or using using json.dump, and then load go to
- chrome://tracing in the Chrome web browser and load the dumped file.
- Make sure to enable "Flow events" in the "View Options" menu.
- Args:
- filename: If a filename is provided, the timeline is dumped to that
- file.
- Returns:
- If filename is not provided, this returns a list of profiling
- events. Each profile event is a dictionary.
- """
- # TODO(rkn): Support including the task specification data in the
- # timeline.
- # TODO(rkn): This should support viewing just a window of time or a
- # limited number of events.
- self._connect_and_get_accessor()
- # Add a small delay to account for propagation delay of events to the GCS.
- # This should be harmless enough but prevents calls to timeline() from
- # missing recent timeline data.
- import time
- time.sleep(1)
- profile_events = self.profile_events()
- all_events = []
- for component_id_hex, component_events in profile_events.items():
- # Only consider workers and drivers.
- component_type = component_events[0]["component_type"]
- if component_type not in ["worker", "driver"]:
- continue
- for event in component_events:
- new_event = {
- # The category of the event.
- "cat": event["event_type"],
- # The string displayed on the event.
- "name": event["event_type"],
- # The identifier for the group of rows that the event
- # appears in.
- "pid": event["node_ip_address"],
- # The identifier for the row that the event appears in.
- "tid": event["component_type"] + ":" + event["component_id"],
- # The start time in microseconds.
- "ts": self._nanoseconds_to_microseconds(event["start_time"]),
- # The duration in microseconds.
- "dur": self._nanoseconds_to_microseconds(
- event["end_time"] - event["start_time"]
- ),
- # What is this?
- "ph": "X",
- # This is the name of the color to display the box in.
- "cname": self._default_color_mapping[event["event_type"]],
- # The extra user-defined data.
- "args": event["extra_data"],
- }
- # Modify the json with the additional user-defined extra data.
- # This can be used to add fields or override existing fields.
- if "cname" in event["extra_data"]:
- new_event["cname"] = event["extra_data"]["cname"]
- if "name" in event["extra_data"]:
- new_event["name"] = event["extra_data"]["name"]
- all_events.append(new_event)
- if not all_events:
- logger.warning(
- "No profiling events found. Ray profiling must be enabled "
- "by setting RAY_PROFILING=1, and make sure "
- "RAY_task_events_report_interval_ms=0."
- )
- if filename is not None:
- with open(filename, "w") as outfile:
- json.dump(all_events, outfile)
- else:
- return all_events
- def chrome_tracing_object_transfer_dump(self, filename=None):
- """Return a list of transfer events that can viewed as a timeline.
- To view this information as a timeline, simply dump it as a json file
- by passing in "filename" or using json.dump, and then load go to
- chrome://tracing in the Chrome web browser and load the dumped file.
- Make sure to enable "Flow events" in the "View Options" menu.
- Args:
- filename: If a filename is provided, the timeline is dumped to that
- file.
- Returns:
- If filename is not provided, this returns a list of profiling
- events. Each profile event is a dictionary.
- """
- self._connect_and_get_accessor()
- node_id_to_address = {}
- for node_info in self.node_table():
- node_id_to_address[node_info["NodeID"]] = "{}:{}".format(
- node_info["NodeManagerAddress"], node_info["ObjectManagerPort"]
- )
- all_events = []
- for key, items in self.profile_events().items():
- # Only consider object manager events.
- if items[0]["component_type"] != "object_manager":
- continue
- for event in items:
- if event["event_type"] == "transfer_send":
- object_ref, remote_node_id, _, _ = event["extra_data"]
- elif event["event_type"] == "transfer_receive":
- object_ref, remote_node_id, _ = event["extra_data"]
- elif event["event_type"] == "receive_pull_request":
- object_ref, remote_node_id = event["extra_data"]
- else:
- assert False, "This should be unreachable."
- # Choose a color by reading the first couple of hex digits of
- # the object ref as an integer and turning that into a color.
- object_ref_int = int(object_ref[:2], 16)
- color = self._chrome_tracing_colors[
- object_ref_int % len(self._chrome_tracing_colors)
- ]
- new_event = {
- # The category of the event.
- "cat": event["event_type"],
- # The string displayed on the event.
- "name": event["event_type"],
- # The identifier for the group of rows that the event
- # appears in.
- "pid": node_id_to_address[key],
- # The identifier for the row that the event appears in.
- "tid": node_id_to_address[remote_node_id],
- # The start time in microseconds.
- "ts": self._nanoseconds_to_microseconds(event["start_time"]),
- # The duration in microseconds.
- "dur": self._nanoseconds_to_microseconds(
- event["end_time"] - event["start_time"]
- ),
- # What is this?
- "ph": "X",
- # This is the name of the color to display the box in.
- "cname": color,
- # The extra user-defined data.
- "args": event["extra_data"],
- }
- all_events.append(new_event)
- # Add another box with a color indicating whether it was a send
- # or a receive event.
- if event["event_type"] == "transfer_send":
- additional_event = new_event.copy()
- additional_event["cname"] = "black"
- all_events.append(additional_event)
- elif event["event_type"] == "transfer_receive":
- additional_event = new_event.copy()
- additional_event["cname"] = "grey"
- all_events.append(additional_event)
- else:
- pass
- if filename is not None:
- with open(filename, "w") as outfile:
- json.dump(all_events, outfile)
- else:
- return all_events
- def workers(self):
- """Get a dictionary mapping worker ID to worker information."""
- accessor = self._connect_and_get_accessor()
- # Get all data in worker table
- worker_table = accessor.get_worker_table()
- workers_data = {}
- for i in range(len(worker_table)):
- worker_table_data = gcs_pb2.WorkerTableData.FromString(worker_table[i])
- if (
- worker_table_data.is_alive
- and worker_table_data.worker_type == common_pb2.WORKER
- ):
- worker_id = binary_to_hex(worker_table_data.worker_address.worker_id)
- worker_info = worker_table_data.worker_info
- workers_data[worker_id] = {
- "node_ip_address": decode(worker_info[b"node_ip_address"]),
- "plasma_store_socket": decode(worker_info[b"plasma_store_socket"]),
- }
- if b"stderr_file" in worker_info:
- workers_data[worker_id]["stderr_file"] = decode(
- worker_info[b"stderr_file"]
- )
- if b"stdout_file" in worker_info:
- workers_data[worker_id]["stdout_file"] = decode(
- worker_info[b"stdout_file"]
- )
- return workers_data
- def add_worker(self, worker_id, worker_type, worker_info):
- """Add a worker to the cluster.
- Args:
- worker_id: ID of this worker. Type is bytes.
- worker_type: Type of this worker. Value is common_pb2.DRIVER or
- common_pb2.WORKER.
- worker_info: Info of this worker. Type is dict{str: str}.
- Returns:
- Is operation success
- """
- accessor = self._connect_and_get_accessor()
- worker_data = gcs_pb2.WorkerTableData()
- worker_data.is_alive = True
- worker_data.worker_address.worker_id = worker_id
- worker_data.worker_type = worker_type
- for k, v in worker_info.items():
- worker_data.worker_info[k] = bytes(v, encoding="utf-8")
- return accessor.add_worker_info(worker_data.SerializeToString())
- def update_worker_debugger_port(self, worker_id, debugger_port):
- """Update the debugger port of a worker.
- Args:
- worker_id: ID of this worker. Type is bytes.
- debugger_port: Port of the debugger. Type is int.
- Returns:
- Is operation success
- """
- accessor = self._connect_and_get_accessor()
- assert worker_id is not None, "worker_id is not valid"
- assert (
- debugger_port is not None and debugger_port > 0
- ), "debugger_port is not valid"
- return accessor.update_worker_debugger_port(worker_id, debugger_port)
- def get_worker_debugger_port(self, worker_id):
- """Get the debugger port of a worker.
- Args:
- worker_id: ID of this worker. Type is bytes.
- Returns:
- Debugger port of the worker.
- """
- accessor = self._connect_and_get_accessor()
- assert worker_id is not None, "worker_id is not valid"
- return accessor.get_worker_debugger_port(worker_id)
- def update_worker_num_paused_threads(self, worker_id, num_paused_threads_delta):
- """Updates the number of paused threads of a worker.
- Args:
- worker_id: ID of this worker. Type is bytes.
- num_paused_threads_delta: The delta of the number of paused threads.
- Returns:
- Is operation success
- """
- accessor = self._connect_and_get_accessor()
- assert worker_id is not None, "worker_id is not valid"
- assert num_paused_threads_delta is not None, "worker_id is not valid"
- return accessor.update_worker_num_paused_threads(
- worker_id, num_paused_threads_delta
- )
- def cluster_resources(self):
- """Get the current total cluster resources.
- Note that this information can grow stale as nodes are added to or
- removed from the cluster.
- Returns:
- A dictionary mapping resource name to the total quantity of that
- resource in the cluster.
- """
- self._connect_and_get_accessor()
- # Calculate total resources.
- total_resources = defaultdict(int)
- for node_total_resources in self.total_resources_per_node().values():
- for resource_id, value in node_total_resources.items():
- total_resources[resource_id] += value
- return dict(total_resources)
- def _live_node_ids(self):
- """Returns a set of node IDs corresponding to nodes still alive."""
- return set(self.total_resources_per_node().keys())
- def available_resources_per_node(self):
- """Returns a dictionary mapping node id to available resources."""
- accessor = self._connect_and_get_accessor()
- available_resources_by_id = {}
- all_available_resources = accessor.get_all_available_resources()
- for available_resource in all_available_resources:
- message = gcs_pb2.AvailableResources.FromString(available_resource)
- # Calculate available resources for this node.
- dynamic_resources = {}
- for resource_id, capacity in message.resources_available.items():
- dynamic_resources[resource_id] = capacity
- # Update available resources for this node.
- node_id = ray._common.utils.binary_to_hex(message.node_id)
- available_resources_by_id[node_id] = dynamic_resources
- return available_resources_by_id
- # returns a dict that maps node_id(hex string) to a dict of {resource_id: capacity}
- def total_resources_per_node(self) -> Dict[str, Dict[str, int]]:
- accessor = self._connect_and_get_accessor()
- total_resources_by_node = {}
- all_total_resources = accessor.get_all_total_resources()
- for node_total_resources in all_total_resources:
- message = gcs_pb2.TotalResources.FromString(node_total_resources)
- # Calculate total resources for this node.
- node_resources = {}
- for resource_id, capacity in message.resources_total.items():
- node_resources[resource_id] = capacity
- # Update total resources for this node.
- node_id = ray._common.utils.binary_to_hex(message.node_id)
- total_resources_by_node[node_id] = node_resources
- return total_resources_by_node
- def available_resources(self):
- """Get the current available cluster resources.
- This is different from `cluster_resources` in that this will return
- idle (available) resources rather than total resources.
- Note that this information can grow stale as tasks start and finish.
- Returns:
- A dictionary mapping resource name to the total quantity of that
- resource in the cluster. Note that if a resource (e.g., "CPU")
- is currently not available (i.e., quantity is 0), it will not
- be included in this dictionary.
- """
- self._connect_and_get_accessor()
- available_resources_by_id = self.available_resources_per_node()
- # Calculate total available resources.
- total_available_resources = defaultdict(int)
- for available_resources in available_resources_by_id.values():
- for resource_id, num_available in available_resources.items():
- total_available_resources[resource_id] += num_available
- return dict(total_available_resources)
- def get_system_config(self):
- """Get the system config of the cluster."""
- accessor = self._connect_and_get_accessor()
- return json.loads(accessor.get_system_config())
- def get_node(self, node_id: str):
- """Get the node information for a node id."""
- accessor = self._connect_and_get_accessor()
- return accessor.get_node(node_id)
- def get_draining_nodes(self) -> Dict[str, int]:
- """Get all the hex ids of nodes that are being drained
- and the corresponding draining deadline timestamps in ms.
- There is no deadline if the timestamp is 0.
- """
- accessor = self._connect_and_get_accessor()
- return accessor.get_draining_nodes()
- def get_cluster_config(self) -> autoscaler_pb2.ClusterConfig:
- """Get the cluster config of the current cluster."""
- accessor = self._connect_and_get_accessor()
- serialized_cluster_config = accessor.get_internal_kv(
- ray._raylet.GCS_AUTOSCALER_STATE_NAMESPACE.encode(),
- ray._raylet.GCS_AUTOSCALER_CLUSTER_CONFIG_KEY.encode(),
- )
- if serialized_cluster_config:
- return autoscaler_pb2.ClusterConfig.FromString(serialized_cluster_config)
- return None
- @staticmethod
- def _calculate_max_resource_from_cluster_config(
- cluster_config: Optional[autoscaler_pb2.ClusterConfig], key: str
- ) -> Optional[int]:
- """Calculate the maximum available resources for a given resource type from cluster config.
- If the resource type is not available, return None.
- """
- if cluster_config is None:
- return None
- max_value = 0
- for node_group_config in cluster_config.node_group_configs:
- num_resources = node_group_config.resources.get(key, default=0)
- num_nodes = node_group_config.max_count
- if num_nodes == 0 or num_resources == 0:
- continue
- if num_nodes == -1 or num_resources == -1:
- return sys.maxsize
- max_value += num_nodes * num_resources
- if max_value == 0:
- return None
- max_value_limit = cluster_config.max_resources.get(key, default=sys.maxsize)
- return min(max_value, max_value_limit)
- def get_max_resources_from_cluster_config(self) -> Optional[Dict[str, int]]:
- """Get the maximum available resources for all resource types from cluster config.
- Returns:
- A dictionary mapping resource name to the maximum quantity of that
- resource that could be available in the cluster based on the cluster config.
- Returns None if the config is not available.
- Values in the dictionary default to 0 if there is no such resource.
- """
- all_resource_keys = set()
- config = self.get_cluster_config()
- if config is None:
- return None
- if config.node_group_configs:
- for node_group_config in config.node_group_configs:
- all_resource_keys.update(node_group_config.resources.keys())
- if len(all_resource_keys) == 0:
- return None
- result = {}
- for key in all_resource_keys:
- max_value = self._calculate_max_resource_from_cluster_config(config, key)
- result[key] = max_value if max_value is not None else 0
- return result
- def get_actor_info(self, actor_id: ray.ActorID) -> Optional[str]:
- """Get the actor info for a actor id."""
- accessor = self._connect_and_get_accessor()
- return accessor.get_actor_info(actor_id)
- state = GlobalState()
- """A global object used to access the cluster's global state."""
- def jobs():
- """Get a list of the jobs in the cluster (for debugging only).
- Returns:
- Information from the job table, namely a list of dicts with keys:
- - "JobID" (identifier for the job),
- - "DriverIPAddress" (IP address of the driver for this job),
- - "DriverPid" (process ID of the driver for this job),
- - "StartTime" (UNIX timestamp of the start time of this job),
- - "StopTime" (UNIX timestamp of the stop time of this job, if any)
- """
- return state.job_table()
- def next_job_id():
- """Get next job id from GCS.
- Returns:
- Next job id in integer representation in the cluster.
- """
- return state.next_job_id()
- @DeveloperAPI
- @client_mode_hook
- def nodes():
- """Get a list of the nodes in the cluster (for debugging only).
- Returns:
- Information about the Ray clients in the cluster.
- """
- return state.node_table()
- def workers():
- """Get a list of the workers in the cluster.
- Returns:
- Information about the Ray workers in the cluster.
- """
- return state.workers()
- def current_node_id():
- """Return the node id of the current node.
- For example, "node:172.10.5.34". This can be used as a custom resource,
- e.g., {node_id: 1} to reserve the whole node, or {node_id: 0.001} to
- just force placement on the node.
- Returns:
- Id of the current node.
- """
- return NODE_ID_PREFIX + ray.util.get_node_ip_address()
- def node_ids():
- """Get a list of the node ids in the cluster.
- For example, ["node:172.10.5.34", "node:172.42.3.77"]. These can be used
- as custom resources, e.g., {node_id: 1} to reserve the whole node, or
- {node_id: 0.001} to just force placement on the node.
- Returns:
- List of the node resource ids.
- """
- node_ids = []
- for node_total_resources in state.total_resources_per_node().values():
- for resource_id in node_total_resources.keys():
- if (
- resource_id.startswith(NODE_ID_PREFIX)
- and resource_id != HEAD_NODE_RESOURCE_NAME
- ):
- node_ids.append(resource_id)
- return node_ids
- def actors(
- actor_id: Optional[str] = None,
- job_id: Optional[ray.JobID] = None,
- actor_state_name: Optional[str] = None,
- ):
- """Fetch actor info for one or more actor IDs (for debugging only).
- Args:
- actor_id: A hex string of the actor ID to fetch information about. If
- this is None, then all actor information is fetched.
- If this is not None, `job_id` and `actor_state_name`
- will not take effect.
- job_id: To filter actors by job_id, which is of type `ray.JobID`.
- You can use the `ray.get_runtime_context().job_id` function
- to get the current job ID
- actor_state_name: To filter actors based on actor state,
- which can be one of the following: "DEPENDENCIES_UNREADY",
- "PENDING_CREATION", "ALIVE", "RESTARTING", or "DEAD".
- Returns:
- Information about the actors.
- """
- return state.actor_table(
- actor_id=actor_id, job_id=job_id, actor_state_name=actor_state_name
- )
- @DeveloperAPI
- @client_mode_hook
- def timeline(filename=None):
- """Return a list of profiling events that can viewed as a timeline.
- Ray profiling must be enabled by setting the RAY_PROFILING=1 environment
- variable prior to starting Ray, and set RAY_task_events_report_interval_ms=0
- To view this information as a timeline, simply dump it as a json file by
- passing in "filename" or using json.dump, and then load go to
- chrome://tracing in the Chrome web browser and load the dumped file.
- Args:
- filename: If a filename is provided, the timeline is dumped to that
- file.
- Returns:
- If filename is not provided, this returns a list of profiling events.
- Each profile event is a dictionary.
- """
- return state.chrome_tracing_dump(filename=filename)
- def object_transfer_timeline(filename=None):
- """Return a list of transfer events that can viewed as a timeline.
- To view this information as a timeline, simply dump it as a json file by
- passing in "filename" or using json.dump, and then load go to
- chrome://tracing in the Chrome web browser and load the dumped file. Make
- sure to enable "Flow events" in the "View Options" menu.
- Args:
- filename: If a filename is provided, the timeline is dumped to that
- file.
- Returns:
- If filename is not provided, this returns a list of profiling events.
- Each profile event is a dictionary.
- """
- return state.chrome_tracing_object_transfer_dump(filename=filename)
- @DeveloperAPI
- @client_mode_hook
- def cluster_resources():
- """Get the current total cluster resources.
- Note that this information can grow stale as nodes are added to or removed
- from the cluster.
- Returns:
- A dictionary mapping resource name to the total quantity of that
- resource in the cluster.
- """
- return state.cluster_resources()
- @DeveloperAPI
- @client_mode_hook
- def available_resources():
- """Get the current available cluster resources.
- This is different from `cluster_resources` in that this will return idle
- (available) resources rather than total resources.
- Note that this information can grow stale as tasks start and finish.
- Returns:
- A dictionary mapping resource name to the total quantity of that
- resource in the cluster. Note that if a resource (e.g., "CPU")
- is currently not available (i.e., quantity is 0), it will not
- be included in this dictionary.
- """
- return state.available_resources()
- @DeveloperAPI
- def available_resources_per_node():
- """Get the current available resources of each live node.
- Note that this information can grow stale as tasks start and finish.
- Returns:
- A dictionary mapping node hex id to available resources dictionary.
- """
- return state.available_resources_per_node()
- @DeveloperAPI
- def total_resources_per_node():
- """Get the current total resources of each live node.
- Note that this information can grow stale as tasks start and finish.
- Returns:
- A dictionary mapping node hex id to total resources dictionary.
- """
- return state.total_resources_per_node()
- def update_worker_debugger_port(worker_id, debugger_port):
- """Update the debugger port of a worker.
- Args:
- worker_id: ID of this worker. Type is bytes.
- debugger_port: Port of the debugger. Type is int.
- Returns:
- Is operation success
- """
- return state.update_worker_debugger_port(worker_id, debugger_port)
- def update_worker_num_paused_threads(worker_id, num_paused_threads_delta):
- """Update the number of paused threads of a worker.
- Args:
- worker_id: ID of this worker. Type is bytes.
- num_paused_threads_delta: The delta of the number of paused threads.
- Returns:
- Is operation success
- """
- return state.update_worker_num_paused_threads(worker_id, num_paused_threads_delta)
- def get_worker_debugger_port(worker_id):
- """Get the debugger port of a worker.
- Args:
- worker_id: ID of this worker. Type is bytes.
- Returns:
- Debugger port of the worker.
- """
- return state.get_worker_debugger_port(worker_id)
|