__init__.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. import logging
  2. import os
  3. import threading
  4. from typing import Any, Dict, List, Optional, Tuple
  5. import ray._private.ray_constants as ray_constants
  6. from ray._private.client_mode_hook import (
  7. _explicitly_disable_client_mode,
  8. _explicitly_enable_client_mode,
  9. )
  10. from ray._private.ray_logging import setup_logger
  11. from ray._private.utils import check_version_info
  12. from ray.job_config import JobConfig
  13. from ray.util.annotations import DeveloperAPI
  14. logger = logging.getLogger(__name__)
  15. class _ClientContext:
  16. def __init__(self):
  17. from ray.util.client.api import _ClientAPI
  18. self.api = _ClientAPI()
  19. self.client_worker = None
  20. self._server = None
  21. self._connected_with_init = False
  22. self._inside_client_test = False
  23. def connect(
  24. self,
  25. conn_str: str,
  26. job_config: JobConfig = None,
  27. secure: bool = False,
  28. metadata: List[Tuple[str, str]] = None,
  29. connection_retries: int = 3,
  30. namespace: str = None,
  31. *,
  32. ignore_version: bool = False,
  33. _credentials: Optional["grpc.ChannelCredentials"] = None, # noqa: F821
  34. ray_init_kwargs: Optional[Dict[str, Any]] = None,
  35. ) -> Dict[str, Any]:
  36. """Connect the Ray Client to a server.
  37. Args:
  38. conn_str: Connection string, in the form "[host]:port"
  39. job_config: The job config of the server.
  40. secure: Whether to use a TLS secured gRPC channel
  41. metadata: gRPC metadata to send on connect
  42. connection_retries: number of connection attempts to make
  43. ignore_version: whether to ignore Python or Ray version mismatches.
  44. This should only be used for debugging purposes.
  45. Returns:
  46. Dictionary of connection info, e.g., {"num_clients": 1}.
  47. """
  48. # Delay imports until connect to avoid circular imports.
  49. from ray.util.client.worker import Worker
  50. if self.client_worker is not None:
  51. if self._connected_with_init:
  52. return
  53. raise Exception("ray.init() called, but ray client is already connected")
  54. if not self._inside_client_test:
  55. # If we're calling a client connect specifically and we're not
  56. # currently in client mode, ensure we are.
  57. _explicitly_enable_client_mode()
  58. if namespace is not None:
  59. job_config = job_config or JobConfig()
  60. job_config.set_ray_namespace(namespace)
  61. logging_level = ray_constants.LOGGER_LEVEL
  62. logging_format = ray_constants.LOGGER_FORMAT
  63. if ray_init_kwargs is None:
  64. ray_init_kwargs = {}
  65. # NOTE(architkulkarni): env_hook is not supported with Ray Client.
  66. ray_init_kwargs["_skip_env_hook"] = True
  67. if ray_init_kwargs.get("logging_level") is not None:
  68. logging_level = ray_init_kwargs["logging_level"]
  69. if ray_init_kwargs.get("logging_format") is not None:
  70. logging_format = ray_init_kwargs["logging_format"]
  71. setup_logger(logging_level, logging_format)
  72. try:
  73. self.client_worker = Worker(
  74. conn_str,
  75. secure=secure,
  76. _credentials=_credentials,
  77. metadata=metadata,
  78. connection_retries=connection_retries,
  79. )
  80. self.api.worker = self.client_worker
  81. self.client_worker._server_init(job_config, ray_init_kwargs)
  82. conn_info = self.client_worker.connection_info()
  83. self._check_versions(conn_info, ignore_version)
  84. self._register_serializers()
  85. return conn_info
  86. except Exception:
  87. self.disconnect()
  88. raise
  89. def _register_serializers(self):
  90. """Register the custom serializer addons at the client side.
  91. The server side should have already registered the serializers via
  92. regular worker's serialization_context mechanism.
  93. """
  94. import ray.util.serialization_addons
  95. from ray.util.serialization import StandaloneSerializationContext
  96. ctx = StandaloneSerializationContext()
  97. ray.util.serialization_addons.apply(ctx)
  98. def _check_versions(self, conn_info: Dict[str, Any], ignore_version: bool) -> None:
  99. # conn_info has "python_version" and "ray_version" so it can be used to compare.
  100. ignore_version = ignore_version or ("RAY_IGNORE_VERSION_MISMATCH" in os.environ)
  101. check_version_info(
  102. conn_info,
  103. "Ray Client",
  104. raise_on_mismatch=not ignore_version,
  105. python_version_match_level="minor",
  106. )
  107. def disconnect(self):
  108. """Disconnect the Ray Client."""
  109. from ray.util.client.api import _ClientAPI
  110. if self.client_worker is not None:
  111. self.client_worker.close()
  112. self.api = _ClientAPI()
  113. self.client_worker = None
  114. # remote can be called outside of a connection, which is why it
  115. # exists on the same API layer as connect() itself.
  116. def remote(self, *args, **kwargs):
  117. """remote is the hook stub passed on to replace `ray.remote`.
  118. This sets up remote functions or actors, as the decorator,
  119. but does not execute them.
  120. Args:
  121. args: opaque arguments
  122. kwargs: opaque keyword arguments
  123. """
  124. return self.api.remote(*args, **kwargs)
  125. def __getattr__(self, key: str):
  126. if self.is_connected():
  127. return getattr(self.api, key)
  128. elif key in ["is_initialized", "_internal_kv_initialized"]:
  129. # Client is not connected, thus Ray is not considered initialized.
  130. return lambda: False
  131. else:
  132. raise Exception(
  133. "Ray Client is not connected. Please connect by calling `ray.init`."
  134. )
  135. def is_connected(self) -> bool:
  136. if self.client_worker is None:
  137. return False
  138. return self.client_worker.is_connected()
  139. def init(self, *args, **kwargs):
  140. if self._server is not None:
  141. raise Exception("Trying to start two instances of ray via client")
  142. import ray.util.client.server.server as ray_client_server
  143. server_handle, address_info = ray_client_server.init_and_serve(
  144. "127.0.0.1", 50051, *args, **kwargs
  145. )
  146. self._server = server_handle.grpc_server
  147. self.connect("127.0.0.1:50051")
  148. self._connected_with_init = True
  149. return address_info
  150. def shutdown(self, _exiting_interpreter=False):
  151. self.disconnect()
  152. import ray.util.client.server.server as ray_client_server
  153. if self._server is None:
  154. return
  155. ray_client_server.shutdown_with_server(self._server, _exiting_interpreter)
  156. self._server = None
  157. # All connected context will be put here
  158. # This struct will be guarded by a lock for thread safety
  159. _all_contexts = set()
  160. _lock = threading.Lock()
  161. # This is the default context which is used when allow_multiple is not True
  162. _default_context = _ClientContext()
  163. @DeveloperAPI
  164. class RayAPIStub:
  165. """This class stands in as the replacement API for the `import ray` module.
  166. Much like the ray module, this mostly delegates the work to the
  167. _client_worker. As parts of the ray API are covered, they are piped through
  168. here or on the client worker API.
  169. """
  170. def __init__(self):
  171. self._cxt = threading.local()
  172. self._cxt.handler = _default_context
  173. self._inside_client_test = False
  174. def get_context(self):
  175. try:
  176. return self._cxt.__getattribute__("handler")
  177. except AttributeError:
  178. self._cxt.handler = _default_context
  179. return self._cxt.handler
  180. def set_context(self, cxt):
  181. old_cxt = self.get_context()
  182. if cxt is None:
  183. self._cxt.handler = _ClientContext()
  184. else:
  185. self._cxt.handler = cxt
  186. return old_cxt
  187. def is_default(self):
  188. return self.get_context() == _default_context
  189. def connect(self, *args, **kw_args):
  190. self.get_context()._inside_client_test = self._inside_client_test
  191. conn = self.get_context().connect(*args, **kw_args)
  192. global _lock, _all_contexts
  193. with _lock:
  194. _all_contexts.add(self._cxt.handler)
  195. return conn
  196. def disconnect(self, *args, **kw_args):
  197. global _lock, _all_contexts, _default_context
  198. with _lock:
  199. if _default_context == self.get_context():
  200. for cxt in _all_contexts:
  201. cxt.disconnect(*args, **kw_args)
  202. _all_contexts = set()
  203. else:
  204. self.get_context().disconnect(*args, **kw_args)
  205. if self.get_context() in _all_contexts:
  206. _all_contexts.remove(self.get_context())
  207. if len(_all_contexts) == 0:
  208. _explicitly_disable_client_mode()
  209. def remote(self, *args, **kwargs):
  210. return self.get_context().remote(*args, **kwargs)
  211. def __getattr__(self, name):
  212. return self.get_context().__getattr__(name)
  213. def is_connected(self, *args, **kwargs):
  214. return self.get_context().is_connected(*args, **kwargs)
  215. def init(self, *args, **kwargs):
  216. ret = self.get_context().init(*args, **kwargs)
  217. global _lock, _all_contexts
  218. with _lock:
  219. _all_contexts.add(self._cxt.handler)
  220. return ret
  221. def shutdown(self, *args, **kwargs):
  222. global _lock, _all_contexts
  223. with _lock:
  224. if _default_context == self.get_context():
  225. for cxt in _all_contexts:
  226. cxt.shutdown(*args, **kwargs)
  227. _all_contexts = set()
  228. else:
  229. self.get_context().shutdown(*args, **kwargs)
  230. if self.get_context() in _all_contexts:
  231. _all_contexts.remove(self.get_context())
  232. if len(_all_contexts) == 0:
  233. _explicitly_disable_client_mode()
  234. ray = RayAPIStub()
  235. @DeveloperAPI
  236. def num_connected_contexts():
  237. """Return the number of client connections active."""
  238. global _lock, _all_contexts
  239. with _lock:
  240. return len(_all_contexts)
  241. # Someday we might add methods in this module so that someone who
  242. # tries to `import ray_client as ray` -- as a module, instead of
  243. # `from ray_client import ray` -- as the API stub
  244. # still gets expected functionality. This is the way the ray package
  245. # worked in the past.
  246. #
  247. # This really calls for PEP 562: https://www.python.org/dev/peps/pep-0562/
  248. # But until Python 3.6 is EOL, here we are.