| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- """This file defines the interface between the ray client worker
- and the overall ray module API.
- """
- import json
- import logging
- from concurrent.futures import Future
- from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union
- from ray._common import ray_option_utils
- from ray.util.client.runtime_context import _ClientWorkerPropertyAPI
- if TYPE_CHECKING:
- from ray.actor import ActorClass
- from ray.core.generated.ray_client_pb2 import DataResponse
- from ray.remote_function import RemoteFunction
- from ray.util.client.common import ClientActorHandle, ClientObjectRef, ClientStub
- logger = logging.getLogger(__name__)
- def _as_bytes(value):
- if isinstance(value, str):
- return value.encode("utf-8")
- return value
- class _ClientAPI:
- """The Client-side methods corresponding to the ray API. Delegates
- to the Client Worker that contains the connection to the ClientServer.
- """
- def __init__(self, worker=None):
- self.worker = worker
- def get(self, vals, *, timeout=None):
- """get is the hook stub passed on to replace `ray.get`
- Args:
- vals: [Client]ObjectRef or list of these refs to retrieve.
- timeout: Optional timeout in milliseconds
- """
- return self.worker.get(vals, timeout=timeout)
- def put(self, *args, **kwargs):
- """put is the hook stub passed on to replace `ray.put`
- Args:
- val: The value to `put`.
- args: opaque arguments
- kwargs: opaque keyword arguments
- """
- return self.worker.put(*args, **kwargs)
- def wait(self, *args, **kwargs):
- """wait is the hook stub passed on to replace `ray.wait`
- Args:
- args: opaque arguments
- kwargs: opaque keyword arguments
- """
- return self.worker.wait(*args, **kwargs)
- def remote(self, *args, **kwargs):
- """remote is the hook stub passed on to replace `ray.remote`.
- This sets up remote functions or actors, as the decorator,
- but does not execute them.
- Args:
- args: opaque arguments
- kwargs: opaque keyword arguments
- """
- # Delayed import to avoid a cyclic import
- from ray.util.client.common import remote_decorator
- if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
- # This is the case where the decorator is just @ray.remote.
- return remote_decorator(options=None)(args[0])
- assert (
- len(args) == 0 and len(kwargs) > 0
- ), ray_option_utils.remote_args_error_string
- return remote_decorator(options=kwargs)
- # TODO(mwtian): consider adding _internal_ prefix to call_remote /
- # call_release / call_retain.
- def call_remote(self, instance: "ClientStub", *args, **kwargs) -> List[Future]:
- """call_remote is called by stub objects to execute them remotely.
- This is used by stub objects in situations where they're called
- with .remote, eg, `f.remote()` or `actor_cls.remote()`.
- This allows the client stub objects to delegate execution to be
- implemented in the most effective way whether it's in the client,
- clientserver, or raylet worker.
- Args:
- instance: The Client-side stub reference to a remote object
- args: opaque arguments
- kwargs: opaque keyword arguments
- """
- return self.worker.call_remote(instance, *args, **kwargs)
- def call_release(self, id: bytes) -> None:
- """Attempts to release an object reference.
- When client references are destructed, they release their reference,
- which can opportunistically send a notification through the datachannel
- to release the reference being held for that object on the server.
- Args:
- id: The id of the reference to release on the server side.
- """
- return self.worker.call_release(id)
- def call_retain(self, id: bytes) -> None:
- """Attempts to retain a client object reference.
- Increments the reference count on the client side, to prevent
- the client worker from attempting to release the server reference.
- Args:
- id: The id of the reference to retain on the client side.
- """
- return self.worker.call_retain(id)
- def close(self) -> None:
- """close cleans up an API connection by closing any channels or
- shutting down any servers gracefully.
- """
- return self.worker.close()
- def get_actor(
- self, name: str, namespace: Optional[str] = None
- ) -> "ClientActorHandle":
- """Returns a handle to an actor by name.
- Args:
- name: The name passed to this actor by
- Actor.options(name="name").remote()
- """
- return self.worker.get_actor(name, namespace)
- def list_named_actors(self, all_namespaces: bool = False) -> List[str]:
- """List all named actors in the system.
- Actors must have been created with Actor.options(name="name").remote().
- This works for both detached & non-detached actors.
- By default, only actors in the current namespace will be returned
- and the returned entries will simply be their name.
- If `all_namespaces` is set to True, all actors in the cluster will be
- returned regardless of namespace, and the retunred entries will be of
- the form '<namespace>/<name>'.
- """
- return self.worker.list_named_actors(all_namespaces)
- def kill(self, actor: "ClientActorHandle", *, no_restart=True):
- """kill forcibly stops an actor running in the cluster
- Args:
- no_restart: Whether this actor should be restarted if it's a
- restartable actor.
- """
- return self.worker.terminate_actor(actor, no_restart)
- def cancel(self, obj: "ClientObjectRef", *, force=False, recursive=True):
- """Cancels a task on the cluster.
- If the specified task is pending execution, it will not be executed. If
- the task is currently executing, the behavior depends on the ``force``
- flag, as per `ray.cancel()`
- Only non-actor tasks can be canceled. Canceled tasks will not be
- retried (max_retries will not be respected).
- Args:
- object_ref: ObjectRef returned by the task
- that should be canceled.
- force: Whether to force-kill a running task by killing
- the worker that is running the task.
- recursive: Whether to try to cancel tasks submitted by
- the task specified.
- """
- return self.worker.terminate_task(obj, force, recursive)
- # Various metadata methods for the client that are defined in the protocol.
- def is_initialized(self) -> bool:
- """True if our client is connected, and if the server is initialized.
- Returns:
- A boolean determining if the client is connected and
- server initialized.
- """
- return self.worker.is_initialized()
- def nodes(self):
- """Get a list of the nodes in the cluster (for debugging only).
- Returns:
- Information about the Ray clients in the cluster.
- """
- # This should be imported here, otherwise, it will error doc build.
- import ray.core.generated.ray_client_pb2 as ray_client_pb2
- return self.worker.get_cluster_info(ray_client_pb2.ClusterInfoType.NODES)
- def method(self, *args, **kwargs):
- """Annotate an actor method
- Args:
- num_returns: The number of object refs that should be returned by
- invocations of this actor method.
- """
- # NOTE: So this follows the same logic as in ray/actor.py::method()
- # The reason to duplicate it here is to simplify the client mode
- # redirection logic. As the annotated method gets pickled and sent to
- # the server from the client it carries this private variable, it
- # activates the same logic on the server side; so there's no need to
- # pass anything else. It's inside the class definition that becomes an
- # actor. Similar annotations would follow the same way.
- valid_kwargs = ["num_returns", "concurrency_group"]
- error_string = (
- "The @ray.method decorator must be applied using at least one of "
- f"the arguments in the list {valid_kwargs}, for example "
- "'@ray.method(num_returns=2)'."
- )
- assert len(args) == 0 and len(kwargs) > 0, error_string
- for key in kwargs:
- key_error_string = (
- f'Unexpected keyword argument to @ray.method: "{key}". The '
- f"supported keyword arguments are {valid_kwargs}"
- )
- assert key in valid_kwargs, key_error_string
- def annotate_method(method):
- if "num_returns" in kwargs:
- method.__ray_num_returns__ = kwargs["num_returns"]
- if "concurrency_group" in kwargs:
- method.__ray_concurrency_group__ = kwargs["concurrency_group"]
- return method
- return annotate_method
- def cluster_resources(self):
- """Get the current total cluster resources.
- Note that this information can grow stale as nodes are added to or
- removed from the cluster.
- Returns:
- A dictionary mapping resource name to the total quantity of that
- resource in the cluster.
- """
- # This should be imported here, otherwise, it will error doc build.
- import ray.core.generated.ray_client_pb2 as ray_client_pb2
- return self.worker.get_cluster_info(
- ray_client_pb2.ClusterInfoType.CLUSTER_RESOURCES
- )
- def available_resources(self):
- """Get the current available cluster resources.
- This is different from `cluster_resources` in that this will return
- idle (available) resources rather than total resources.
- Note that this information can grow stale as tasks start and finish.
- Returns:
- A dictionary mapping resource name to the total quantity of that
- resource in the cluster.
- """
- # This should be imported here, otherwise, it will error doc build.
- import ray.core.generated.ray_client_pb2 as ray_client_pb2
- return self.worker.get_cluster_info(
- ray_client_pb2.ClusterInfoType.AVAILABLE_RESOURCES
- )
- def get_runtime_context(self):
- """Return a Ray RuntimeContext describing the state on the server
- Returns:
- A RuntimeContext wrapping a client making get_cluster_info calls.
- """
- return _ClientWorkerPropertyAPI(self.worker).build_runtime_context()
- # Client process isn't assigned any GPUs.
- def get_gpu_ids(self) -> list:
- return []
- def timeline(self, filename: Optional[str] = None) -> Optional[List[Any]]:
- logger.warning(
- "Timeline will include events from other clients using this server."
- )
- # This should be imported here, otherwise, it will error doc build.
- import ray.core.generated.ray_client_pb2 as ray_client_pb2
- all_events = self.worker.get_cluster_info(
- ray_client_pb2.ClusterInfoType.TIMELINE
- )
- if filename is not None:
- with open(filename, "w") as outfile:
- json.dump(all_events, outfile)
- else:
- return all_events
- def _internal_kv_initialized(self) -> bool:
- """Hook for internal_kv._internal_kv_initialized."""
- # NOTE(edoakes): the kv is always initialized because we initialize it
- # manually in the proxier with a GCS client if Ray hasn't been
- # initialized yet.
- return True
- def _internal_kv_exists(
- self, key: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
- ) -> bool:
- """Hook for internal_kv._internal_kv_exists."""
- return self.worker.internal_kv_exists(
- _as_bytes(key), namespace=_as_bytes(namespace)
- )
- def _internal_kv_get(
- self, key: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
- ) -> bytes:
- """Hook for internal_kv._internal_kv_get."""
- return self.worker.internal_kv_get(
- _as_bytes(key), namespace=_as_bytes(namespace)
- )
- def _internal_kv_put(
- self,
- key: Union[str, bytes],
- value: Union[str, bytes],
- overwrite: bool = True,
- *,
- namespace: Optional[Union[str, bytes]] = None,
- ) -> bool:
- """Hook for internal_kv._internal_kv_put."""
- return self.worker.internal_kv_put(
- _as_bytes(key), _as_bytes(value), overwrite, namespace=_as_bytes(namespace)
- )
- def _internal_kv_del(
- self,
- key: Union[str, bytes],
- *,
- del_by_prefix: bool = False,
- namespace: Optional[Union[str, bytes]] = None,
- ) -> int:
- """Hook for internal_kv._internal_kv_del."""
- return self.worker.internal_kv_del(
- _as_bytes(key), del_by_prefix=del_by_prefix, namespace=_as_bytes(namespace)
- )
- def _internal_kv_list(
- self,
- prefix: Union[str, bytes],
- *,
- namespace: Optional[Union[str, bytes]] = None,
- ) -> List[bytes]:
- """Hook for internal_kv._internal_kv_list."""
- return self.worker.internal_kv_list(
- _as_bytes(prefix), namespace=_as_bytes(namespace)
- )
- def _pin_runtime_env_uri(self, uri: str, expiration_s: int) -> None:
- """Hook for internal_kv._pin_runtime_env_uri."""
- return self.worker.pin_runtime_env_uri(uri, expiration_s)
- def _convert_actor(self, actor: "ActorClass") -> str:
- """Register a ClientActorClass for the ActorClass and return a UUID"""
- return self.worker._convert_actor(actor)
- def _convert_function(self, func: "RemoteFunction") -> str:
- """Register a ClientRemoteFunc for the ActorClass and return a UUID"""
- return self.worker._convert_function(func)
- def _get_converted(self, key: str) -> "ClientStub":
- """Given a UUID, return the converted object"""
- return self.worker._get_converted(key)
- def _converted_key_exists(self, key: str) -> bool:
- """Check if a key UUID is present in the store of converted objects."""
- return self.worker._converted_key_exists(key)
- def __getattr__(self, key: str):
- if not key.startswith("_"):
- raise NotImplementedError(
- "Not available in Ray client: `ray.{}`. This method is only "
- "available within Ray remote functions and is not yet "
- "implemented in the client API.".format(key)
- )
- return self.__getattribute__(key)
- def _register_callback(
- self, ref: "ClientObjectRef", callback: Callable[["DataResponse"], None]
- ) -> None:
- self.worker.register_callback(ref, callback)
- def _get_dashboard_url(self) -> str:
- import ray.core.generated.ray_client_pb2 as ray_client_pb2
- return self.worker.get_cluster_info(
- ray_client_pb2.ClusterInfoType.DASHBOARD_URL
- ).get("dashboard_url", "")
|