import logging from typing import Optional from ray._private import ray_constants from ray.core.generated.common_pb2 import ErrorType, JobConfig from ray.core.generated.gcs_pb2 import ( ActorTableData, AvailableResources, ErrorTableData, GcsEntry, GcsNodeInfo, JobTableData, PlacementGroupTableData, PubSubMessage, ResourceDemand, ResourceLoad, ResourcesData, ResourceUsageBatchData, TablePrefix, TablePubsub, TaskEvents, TotalResources, WorkerTableData, ) logger = logging.getLogger(__name__) __all__ = [ "ActorTableData", "GcsNodeInfo", "AvailableResources", "TotalResources", "JobTableData", "JobConfig", "ErrorTableData", "ErrorType", "GcsEntry", "ResourceUsageBatchData", "ResourcesData", "TablePrefix", "TablePubsub", "TaskEvents", "ResourceDemand", "ResourceLoad", "PubSubMessage", "WorkerTableData", "PlacementGroupTableData", ] WORKER = 0 DRIVER = 1 # Cap messages at 512MB _MAX_MESSAGE_LENGTH = 512 * 1024 * 1024 # Send keepalive every 60s _GRPC_KEEPALIVE_TIME_MS = 60 * 1000 # Keepalive should be replied < 60s _GRPC_KEEPALIVE_TIMEOUT_MS = 60 * 1000 # Also relying on these defaults: # grpc.keepalive_permit_without_calls=0: No keepalive without inflight calls. # grpc.use_local_subchannel_pool=0: Subchannels are shared. _GRPC_OPTIONS = [ *ray_constants.GLOBAL_GRPC_OPTIONS, ("grpc.max_send_message_length", _MAX_MESSAGE_LENGTH), ("grpc.max_receive_message_length", _MAX_MESSAGE_LENGTH), ("grpc.keepalive_time_ms", _GRPC_KEEPALIVE_TIME_MS), ("grpc.keepalive_timeout_ms", _GRPC_KEEPALIVE_TIMEOUT_MS), ] def create_gcs_channel(address: str, aio=False): """Returns a GRPC channel to GCS. Args: address: GCS address string, e.g. ip:port aio: Whether using grpc.aio Returns: grpc.Channel or grpc.aio.Channel to GCS """ from ray._private.grpc_utils import init_grpc_channel return init_grpc_channel(address, options=_GRPC_OPTIONS, asynchronous=aio) class GcsChannel: def __init__(self, gcs_address: Optional[str] = None, aio: bool = False): self._gcs_address = gcs_address self._aio = aio @property def address(self): return self._gcs_address def connect(self): # GCS server uses a cached port, so it should use the same port after # restarting. This means GCS address should stay the same for the # lifetime of the Ray cluster. self._channel = create_gcs_channel(self._gcs_address, self._aio) def channel(self): return self._channel def cleanup_redis_storage( host: str, port: int, password: str, use_ssl: bool, storage_namespace: str, username: Optional[str] = None, ): """This function is used to cleanup the GCS storage in Redis. It supports Redis in cluster and non-cluster modes. Args: host: The Redis host address. port: The Redis port. username: The Redis username. password: The Redis password. use_ssl: Whether to encrypt the connection. storage_namespace: The namespace of the storage to be deleted. """ from ray._raylet import del_key_prefix_from_storage # type: ignore if not isinstance(host, str): raise ValueError("Host must be a string") if username is None: username = "" if not isinstance(username, str): raise ValueError("Username must be a string") if not isinstance(password, str): raise ValueError("Password must be a string") if port < 0: raise ValueError(f"Invalid port: {port}") if not isinstance(use_ssl, bool): raise TypeError("use_ssl must be a boolean") if not isinstance(storage_namespace, str): raise ValueError("storage namespace must be a string") # Right now, GCS stores all data in multiple hashes with keys prefixed by # storage_namespace. So we only need to delete the specific key prefix to cleanup # the cluster's data. # Note this deletes all keys with prefix `RAY{key_prefix}@`, not `{key_prefix}`. return del_key_prefix_from_storage( host, port, username, password, use_ssl, storage_namespace )