| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import logging
- from typing import Optional
- from ray._private import ray_constants
- from ray.core.generated.common_pb2 import ErrorType, JobConfig
- from ray.core.generated.gcs_pb2 import (
- ActorTableData,
- AvailableResources,
- ErrorTableData,
- GcsEntry,
- GcsNodeInfo,
- JobTableData,
- PlacementGroupTableData,
- PubSubMessage,
- ResourceDemand,
- ResourceLoad,
- ResourcesData,
- ResourceUsageBatchData,
- TablePrefix,
- TablePubsub,
- TaskEvents,
- TotalResources,
- WorkerTableData,
- )
- logger = logging.getLogger(__name__)
- __all__ = [
- "ActorTableData",
- "GcsNodeInfo",
- "AvailableResources",
- "TotalResources",
- "JobTableData",
- "JobConfig",
- "ErrorTableData",
- "ErrorType",
- "GcsEntry",
- "ResourceUsageBatchData",
- "ResourcesData",
- "TablePrefix",
- "TablePubsub",
- "TaskEvents",
- "ResourceDemand",
- "ResourceLoad",
- "PubSubMessage",
- "WorkerTableData",
- "PlacementGroupTableData",
- ]
- WORKER = 0
- DRIVER = 1
- # Cap messages at 512MB
- _MAX_MESSAGE_LENGTH = 512 * 1024 * 1024
- # Send keepalive every 60s
- _GRPC_KEEPALIVE_TIME_MS = 60 * 1000
- # Keepalive should be replied < 60s
- _GRPC_KEEPALIVE_TIMEOUT_MS = 60 * 1000
- # Also relying on these defaults:
- # grpc.keepalive_permit_without_calls=0: No keepalive without inflight calls.
- # grpc.use_local_subchannel_pool=0: Subchannels are shared.
- _GRPC_OPTIONS = [
- *ray_constants.GLOBAL_GRPC_OPTIONS,
- ("grpc.max_send_message_length", _MAX_MESSAGE_LENGTH),
- ("grpc.max_receive_message_length", _MAX_MESSAGE_LENGTH),
- ("grpc.keepalive_time_ms", _GRPC_KEEPALIVE_TIME_MS),
- ("grpc.keepalive_timeout_ms", _GRPC_KEEPALIVE_TIMEOUT_MS),
- ]
- def create_gcs_channel(address: str, aio=False):
- """Returns a GRPC channel to GCS.
- Args:
- address: GCS address string, e.g. ip:port
- aio: Whether using grpc.aio
- Returns:
- grpc.Channel or grpc.aio.Channel to GCS
- """
- from ray._private.grpc_utils import init_grpc_channel
- return init_grpc_channel(address, options=_GRPC_OPTIONS, asynchronous=aio)
- class GcsChannel:
- def __init__(self, gcs_address: Optional[str] = None, aio: bool = False):
- self._gcs_address = gcs_address
- self._aio = aio
- @property
- def address(self):
- return self._gcs_address
- def connect(self):
- # GCS server uses a cached port, so it should use the same port after
- # restarting. This means GCS address should stay the same for the
- # lifetime of the Ray cluster.
- self._channel = create_gcs_channel(self._gcs_address, self._aio)
- def channel(self):
- return self._channel
- def cleanup_redis_storage(
- host: str,
- port: int,
- password: str,
- use_ssl: bool,
- storage_namespace: str,
- username: Optional[str] = None,
- ):
- """This function is used to cleanup the GCS storage in Redis.
- It supports Redis in cluster and non-cluster modes.
- Args:
- host: The Redis host address.
- port: The Redis port.
- username: The Redis username.
- password: The Redis password.
- use_ssl: Whether to encrypt the connection.
- storage_namespace: The namespace of the storage to be deleted.
- """
- from ray._raylet import del_key_prefix_from_storage # type: ignore
- if not isinstance(host, str):
- raise ValueError("Host must be a string")
- if username is None:
- username = ""
- if not isinstance(username, str):
- raise ValueError("Username must be a string")
- if not isinstance(password, str):
- raise ValueError("Password must be a string")
- if port < 0:
- raise ValueError(f"Invalid port: {port}")
- if not isinstance(use_ssl, bool):
- raise TypeError("use_ssl must be a boolean")
- if not isinstance(storage_namespace, str):
- raise ValueError("storage namespace must be a string")
- # Right now, GCS stores all data in multiple hashes with keys prefixed by
- # storage_namespace. So we only need to delete the specific key prefix to cleanup
- # the cluster's data.
- # Note this deletes all keys with prefix `RAY{key_prefix}@`, not `{key_prefix}`.
- return del_key_prefix_from_storage(
- host, port, username, password, use_ssl, storage_namespace
- )
|