gcs_utils.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import logging
  2. from typing import Optional
  3. from ray._private import ray_constants
  4. from ray.core.generated.common_pb2 import ErrorType, JobConfig
  5. from ray.core.generated.gcs_pb2 import (
  6. ActorTableData,
  7. AvailableResources,
  8. ErrorTableData,
  9. GcsEntry,
  10. GcsNodeInfo,
  11. JobTableData,
  12. PlacementGroupTableData,
  13. PubSubMessage,
  14. ResourceDemand,
  15. ResourceLoad,
  16. ResourcesData,
  17. ResourceUsageBatchData,
  18. TablePrefix,
  19. TablePubsub,
  20. TaskEvents,
  21. TotalResources,
  22. WorkerTableData,
  23. )
  24. logger = logging.getLogger(__name__)
  25. __all__ = [
  26. "ActorTableData",
  27. "GcsNodeInfo",
  28. "AvailableResources",
  29. "TotalResources",
  30. "JobTableData",
  31. "JobConfig",
  32. "ErrorTableData",
  33. "ErrorType",
  34. "GcsEntry",
  35. "ResourceUsageBatchData",
  36. "ResourcesData",
  37. "TablePrefix",
  38. "TablePubsub",
  39. "TaskEvents",
  40. "ResourceDemand",
  41. "ResourceLoad",
  42. "PubSubMessage",
  43. "WorkerTableData",
  44. "PlacementGroupTableData",
  45. ]
  46. WORKER = 0
  47. DRIVER = 1
  48. # Cap messages at 512MB
  49. _MAX_MESSAGE_LENGTH = 512 * 1024 * 1024
  50. # Send keepalive every 60s
  51. _GRPC_KEEPALIVE_TIME_MS = 60 * 1000
  52. # Keepalive should be replied < 60s
  53. _GRPC_KEEPALIVE_TIMEOUT_MS = 60 * 1000
  54. # Also relying on these defaults:
  55. # grpc.keepalive_permit_without_calls=0: No keepalive without inflight calls.
  56. # grpc.use_local_subchannel_pool=0: Subchannels are shared.
  57. _GRPC_OPTIONS = [
  58. *ray_constants.GLOBAL_GRPC_OPTIONS,
  59. ("grpc.max_send_message_length", _MAX_MESSAGE_LENGTH),
  60. ("grpc.max_receive_message_length", _MAX_MESSAGE_LENGTH),
  61. ("grpc.keepalive_time_ms", _GRPC_KEEPALIVE_TIME_MS),
  62. ("grpc.keepalive_timeout_ms", _GRPC_KEEPALIVE_TIMEOUT_MS),
  63. ]
  64. def create_gcs_channel(address: str, aio=False):
  65. """Returns a GRPC channel to GCS.
  66. Args:
  67. address: GCS address string, e.g. ip:port
  68. aio: Whether using grpc.aio
  69. Returns:
  70. grpc.Channel or grpc.aio.Channel to GCS
  71. """
  72. from ray._private.grpc_utils import init_grpc_channel
  73. return init_grpc_channel(address, options=_GRPC_OPTIONS, asynchronous=aio)
  74. class GcsChannel:
  75. def __init__(self, gcs_address: Optional[str] = None, aio: bool = False):
  76. self._gcs_address = gcs_address
  77. self._aio = aio
  78. @property
  79. def address(self):
  80. return self._gcs_address
  81. def connect(self):
  82. # GCS server uses a cached port, so it should use the same port after
  83. # restarting. This means GCS address should stay the same for the
  84. # lifetime of the Ray cluster.
  85. self._channel = create_gcs_channel(self._gcs_address, self._aio)
  86. def channel(self):
  87. return self._channel
  88. def cleanup_redis_storage(
  89. host: str,
  90. port: int,
  91. password: str,
  92. use_ssl: bool,
  93. storage_namespace: str,
  94. username: Optional[str] = None,
  95. ):
  96. """This function is used to cleanup the GCS storage in Redis.
  97. It supports Redis in cluster and non-cluster modes.
  98. Args:
  99. host: The Redis host address.
  100. port: The Redis port.
  101. username: The Redis username.
  102. password: The Redis password.
  103. use_ssl: Whether to encrypt the connection.
  104. storage_namespace: The namespace of the storage to be deleted.
  105. """
  106. from ray._raylet import del_key_prefix_from_storage # type: ignore
  107. if not isinstance(host, str):
  108. raise ValueError("Host must be a string")
  109. if username is None:
  110. username = ""
  111. if not isinstance(username, str):
  112. raise ValueError("Username must be a string")
  113. if not isinstance(password, str):
  114. raise ValueError("Password must be a string")
  115. if port < 0:
  116. raise ValueError(f"Invalid port: {port}")
  117. if not isinstance(use_ssl, bool):
  118. raise TypeError("use_ssl must be a boolean")
  119. if not isinstance(storage_namespace, str):
  120. raise ValueError("storage namespace must be a string")
  121. # Right now, GCS stores all data in multiple hashes with keys prefixed by
  122. # storage_namespace. So we only need to delete the specific key prefix to cleanup
  123. # the cluster's data.
  124. # Note this deletes all keys with prefix `RAY{key_prefix}@`, not `{key_prefix}`.
  125. return del_key_prefix_from_storage(
  126. host, port, username, password, use_ssl, storage_namespace
  127. )