job_config.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. import uuid
  2. from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
  3. import ray.cloudpickle as pickle
  4. from ray._private.ray_logging.logging_config import LoggingConfig
  5. from ray.util.annotations import PublicAPI
  6. if TYPE_CHECKING:
  7. from ray.runtime_env import RuntimeEnv
  8. @PublicAPI
  9. class JobConfig:
  10. """A class used to store the configurations of a job.
  11. Examples:
  12. .. testcode::
  13. :hide:
  14. import ray
  15. ray.shutdown()
  16. .. testcode::
  17. import ray
  18. from ray.job_config import JobConfig
  19. ray.init(job_config=JobConfig(default_actor_lifetime="non_detached"))
  20. Args:
  21. jvm_options: The jvm options for java workers of the job.
  22. code_search_path: A list of directories or jar files that
  23. specify the search path for user code. This will be used as
  24. `CLASSPATH` in Java and `PYTHONPATH` in Python.
  25. See :ref:`Ray cross-language programming <cross_language>` for more details.
  26. runtime_env: A :ref:`runtime environment <runtime-environments>` dictionary.
  27. metadata: An opaque metadata dictionary.
  28. ray_namespace: A :ref:`namespace <namespaces-guide>`
  29. is a logical grouping of jobs and named actors.
  30. default_actor_lifetime: The default value of actor lifetime,
  31. can be "detached" or "non_detached".
  32. See :ref:`actor lifetimes <actor-lifetimes>` for more details.
  33. """
  34. def __init__(
  35. self,
  36. jvm_options: Optional[List[str]] = None,
  37. code_search_path: Optional[List[str]] = None,
  38. runtime_env: Optional[dict] = None,
  39. _client_job: bool = False,
  40. metadata: Optional[dict] = None,
  41. ray_namespace: Optional[str] = None,
  42. default_actor_lifetime: str = "non_detached",
  43. _py_driver_sys_path: Optional[List[str]] = None,
  44. ):
  45. #: The jvm options for java workers of the job.
  46. self.jvm_options = jvm_options or []
  47. #: A list of directories or jar files that
  48. #: specify the search path for user code.
  49. self.code_search_path = code_search_path or []
  50. # It's difficult to find the error that caused by the
  51. # code_search_path is a string. So we assert here.
  52. assert isinstance(self.code_search_path, (list, tuple)), (
  53. f"The type of code search path is incorrect: " f"{type(code_search_path)}"
  54. )
  55. self._client_job = _client_job
  56. #: An opaque metadata dictionary.
  57. self.metadata = metadata or {}
  58. #: A namespace is a logical grouping of jobs and named actors.
  59. self.ray_namespace = ray_namespace
  60. self.set_runtime_env(runtime_env)
  61. self.set_default_actor_lifetime(default_actor_lifetime)
  62. # A list of directories that specify the search path for python workers.
  63. self._py_driver_sys_path = _py_driver_sys_path or []
  64. # Python logging configurations that will be passed to Ray tasks/actors.
  65. self.py_logging_config = None
  66. def set_metadata(self, key: str, value: str) -> None:
  67. """Add key-value pair to the metadata dictionary.
  68. If the key already exists, the value is overwritten to the new value.
  69. Examples:
  70. .. testcode::
  71. import ray
  72. from ray.job_config import JobConfig
  73. job_config = JobConfig()
  74. job_config.set_metadata("submitter", "foo")
  75. Args:
  76. key: The key of the metadata.
  77. value: The value of the metadata.
  78. """
  79. self.metadata[key] = value
  80. def _serialize(self) -> str:
  81. """Serialize the struct into protobuf string"""
  82. return self._get_proto_job_config().SerializeToString()
  83. def set_runtime_env(
  84. self,
  85. runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]],
  86. validate: bool = False,
  87. ) -> None:
  88. """Modify the runtime_env of the JobConfig.
  89. We don't validate the runtime_env by default here because it may go
  90. through some translation before actually being passed to C++ (e.g.,
  91. working_dir translated from a local directory to a URI).
  92. Args:
  93. runtime_env: A :ref:`runtime environment <runtime-environments>` dictionary.
  94. validate: Whether to validate the runtime env.
  95. """
  96. self.runtime_env = runtime_env if runtime_env is not None else {}
  97. if validate:
  98. self.runtime_env = self._validate_runtime_env()
  99. self._cached_pb = None
  100. def set_py_logging_config(
  101. self,
  102. logging_config: Optional[LoggingConfig] = None,
  103. ):
  104. """Set the logging configuration for the job.
  105. The logging configuration will be applied to the root loggers of
  106. all Ray task and actor processes that belong to this job.
  107. Args:
  108. logging_config: The logging configuration to set.
  109. """
  110. self.py_logging_config = logging_config
  111. def set_ray_namespace(self, ray_namespace: str) -> None:
  112. """Set Ray :ref:`namespace <namespaces-guide>`.
  113. Args:
  114. ray_namespace: The namespace to set.
  115. """
  116. if ray_namespace != self.ray_namespace:
  117. self.ray_namespace = ray_namespace
  118. self._cached_pb = None
  119. def set_default_actor_lifetime(self, default_actor_lifetime: str) -> None:
  120. """Set the default actor lifetime, which can be "detached" or "non_detached".
  121. See :ref:`actor lifetimes <actor-lifetimes>` for more details.
  122. Args:
  123. default_actor_lifetime: The default actor lifetime to set.
  124. """
  125. import ray.core.generated.common_pb2 as common_pb2
  126. if default_actor_lifetime == "detached":
  127. self._default_actor_lifetime = common_pb2.JobConfig.ActorLifetime.DETACHED
  128. elif default_actor_lifetime == "non_detached":
  129. self._default_actor_lifetime = (
  130. common_pb2.JobConfig.ActorLifetime.NON_DETACHED
  131. )
  132. else:
  133. raise ValueError(
  134. "Default actor lifetime must be one of `detached`, `non_detached`"
  135. )
  136. def _validate_runtime_env(self):
  137. # TODO(edoakes): this is really unfortunate, but JobConfig is imported
  138. # all over the place so this causes circular imports. We should remove
  139. # this dependency and pass in a validated runtime_env instead.
  140. from ray.runtime_env import RuntimeEnv
  141. from ray.runtime_env.runtime_env import _validate_no_local_paths
  142. runtime_env = self.runtime_env
  143. if not isinstance(runtime_env, RuntimeEnv):
  144. runtime_env = RuntimeEnv(**self.runtime_env)
  145. _validate_no_local_paths(runtime_env)
  146. return runtime_env
  147. def _get_proto_job_config(self):
  148. """Return the protobuf structure of JobConfig."""
  149. # TODO(edoakes): this is really unfortunate, but JobConfig is imported
  150. # all over the place so this causes circular imports. We should remove
  151. # this dependency and pass in a validated runtime_env instead.
  152. import ray.core.generated.common_pb2 as common_pb2
  153. from ray._private.utils import get_runtime_env_info
  154. if self._cached_pb is None:
  155. pb = common_pb2.JobConfig()
  156. if self.ray_namespace is None:
  157. pb.ray_namespace = str(uuid.uuid4())
  158. else:
  159. pb.ray_namespace = self.ray_namespace
  160. pb.jvm_options.extend(self.jvm_options)
  161. pb.code_search_path.extend(self.code_search_path)
  162. pb.py_driver_sys_path.extend(self._py_driver_sys_path)
  163. for k, v in self.metadata.items():
  164. pb.metadata[k] = v
  165. parsed_env = self._validate_runtime_env()
  166. pb.runtime_env_info.CopyFrom(
  167. get_runtime_env_info(
  168. parsed_env,
  169. is_job_runtime_env=True,
  170. serialize=False,
  171. )
  172. )
  173. if self._default_actor_lifetime is not None:
  174. pb.default_actor_lifetime = self._default_actor_lifetime
  175. if self.py_logging_config:
  176. pb.serialized_py_logging_config = pickle.dumps(self.py_logging_config)
  177. self._cached_pb = pb
  178. return self._cached_pb
  179. def _runtime_env_has_working_dir(self):
  180. return self._validate_runtime_env().has_working_dir()
  181. def _get_serialized_runtime_env(self) -> str:
  182. """Return the JSON-serialized parsed runtime env dict"""
  183. return self._validate_runtime_env().serialize()
  184. def _get_proto_runtime_env_config(self) -> str:
  185. """Return the JSON-serialized parsed runtime env info"""
  186. return self._get_proto_job_config().runtime_env_info.runtime_env_config
  187. @classmethod
  188. def from_json(cls, job_config_json):
  189. """Generates a JobConfig object from json.
  190. Examples:
  191. .. testcode::
  192. from ray.job_config import JobConfig
  193. job_config = JobConfig.from_json(
  194. {"runtime_env": {"working_dir": "uri://abc"}})
  195. Args:
  196. job_config_json: The job config json dictionary.
  197. """
  198. return cls(
  199. jvm_options=job_config_json.get("jvm_options", None),
  200. code_search_path=job_config_json.get("code_search_path", None),
  201. runtime_env=job_config_json.get("runtime_env", None),
  202. metadata=job_config_json.get("metadata", None),
  203. ray_namespace=job_config_json.get("ray_namespace", None),
  204. _client_job=job_config_json.get("client_job", False),
  205. _py_driver_sys_path=job_config_json.get("py_driver_sys_path", None),
  206. )