| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- import uuid
- from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
- import ray.cloudpickle as pickle
- from ray._private.ray_logging.logging_config import LoggingConfig
- from ray.util.annotations import PublicAPI
- if TYPE_CHECKING:
- from ray.runtime_env import RuntimeEnv
- @PublicAPI
- class JobConfig:
- """A class used to store the configurations of a job.
- Examples:
- .. testcode::
- :hide:
- import ray
- ray.shutdown()
- .. testcode::
- import ray
- from ray.job_config import JobConfig
- ray.init(job_config=JobConfig(default_actor_lifetime="non_detached"))
- Args:
- jvm_options: The jvm options for java workers of the job.
- code_search_path: A list of directories or jar files that
- specify the search path for user code. This will be used as
- `CLASSPATH` in Java and `PYTHONPATH` in Python.
- See :ref:`Ray cross-language programming <cross_language>` for more details.
- runtime_env: A :ref:`runtime environment <runtime-environments>` dictionary.
- metadata: An opaque metadata dictionary.
- ray_namespace: A :ref:`namespace <namespaces-guide>`
- is a logical grouping of jobs and named actors.
- default_actor_lifetime: The default value of actor lifetime,
- can be "detached" or "non_detached".
- See :ref:`actor lifetimes <actor-lifetimes>` for more details.
- """
- def __init__(
- self,
- jvm_options: Optional[List[str]] = None,
- code_search_path: Optional[List[str]] = None,
- runtime_env: Optional[dict] = None,
- _client_job: bool = False,
- metadata: Optional[dict] = None,
- ray_namespace: Optional[str] = None,
- default_actor_lifetime: str = "non_detached",
- _py_driver_sys_path: Optional[List[str]] = None,
- ):
- #: The jvm options for java workers of the job.
- self.jvm_options = jvm_options or []
- #: A list of directories or jar files that
- #: specify the search path for user code.
- self.code_search_path = code_search_path or []
- # It's difficult to find the error that caused by the
- # code_search_path is a string. So we assert here.
- assert isinstance(self.code_search_path, (list, tuple)), (
- f"The type of code search path is incorrect: " f"{type(code_search_path)}"
- )
- self._client_job = _client_job
- #: An opaque metadata dictionary.
- self.metadata = metadata or {}
- #: A namespace is a logical grouping of jobs and named actors.
- self.ray_namespace = ray_namespace
- self.set_runtime_env(runtime_env)
- self.set_default_actor_lifetime(default_actor_lifetime)
- # A list of directories that specify the search path for python workers.
- self._py_driver_sys_path = _py_driver_sys_path or []
- # Python logging configurations that will be passed to Ray tasks/actors.
- self.py_logging_config = None
- def set_metadata(self, key: str, value: str) -> None:
- """Add key-value pair to the metadata dictionary.
- If the key already exists, the value is overwritten to the new value.
- Examples:
- .. testcode::
- import ray
- from ray.job_config import JobConfig
- job_config = JobConfig()
- job_config.set_metadata("submitter", "foo")
- Args:
- key: The key of the metadata.
- value: The value of the metadata.
- """
- self.metadata[key] = value
- def _serialize(self) -> str:
- """Serialize the struct into protobuf string"""
- return self._get_proto_job_config().SerializeToString()
- def set_runtime_env(
- self,
- runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]],
- validate: bool = False,
- ) -> None:
- """Modify the runtime_env of the JobConfig.
- We don't validate the runtime_env by default here because it may go
- through some translation before actually being passed to C++ (e.g.,
- working_dir translated from a local directory to a URI).
- Args:
- runtime_env: A :ref:`runtime environment <runtime-environments>` dictionary.
- validate: Whether to validate the runtime env.
- """
- self.runtime_env = runtime_env if runtime_env is not None else {}
- if validate:
- self.runtime_env = self._validate_runtime_env()
- self._cached_pb = None
- def set_py_logging_config(
- self,
- logging_config: Optional[LoggingConfig] = None,
- ):
- """Set the logging configuration for the job.
- The logging configuration will be applied to the root loggers of
- all Ray task and actor processes that belong to this job.
- Args:
- logging_config: The logging configuration to set.
- """
- self.py_logging_config = logging_config
- def set_ray_namespace(self, ray_namespace: str) -> None:
- """Set Ray :ref:`namespace <namespaces-guide>`.
- Args:
- ray_namespace: The namespace to set.
- """
- if ray_namespace != self.ray_namespace:
- self.ray_namespace = ray_namespace
- self._cached_pb = None
- def set_default_actor_lifetime(self, default_actor_lifetime: str) -> None:
- """Set the default actor lifetime, which can be "detached" or "non_detached".
- See :ref:`actor lifetimes <actor-lifetimes>` for more details.
- Args:
- default_actor_lifetime: The default actor lifetime to set.
- """
- import ray.core.generated.common_pb2 as common_pb2
- if default_actor_lifetime == "detached":
- self._default_actor_lifetime = common_pb2.JobConfig.ActorLifetime.DETACHED
- elif default_actor_lifetime == "non_detached":
- self._default_actor_lifetime = (
- common_pb2.JobConfig.ActorLifetime.NON_DETACHED
- )
- else:
- raise ValueError(
- "Default actor lifetime must be one of `detached`, `non_detached`"
- )
- def _validate_runtime_env(self):
- # TODO(edoakes): this is really unfortunate, but JobConfig is imported
- # all over the place so this causes circular imports. We should remove
- # this dependency and pass in a validated runtime_env instead.
- from ray.runtime_env import RuntimeEnv
- from ray.runtime_env.runtime_env import _validate_no_local_paths
- runtime_env = self.runtime_env
- if not isinstance(runtime_env, RuntimeEnv):
- runtime_env = RuntimeEnv(**self.runtime_env)
- _validate_no_local_paths(runtime_env)
- return runtime_env
- def _get_proto_job_config(self):
- """Return the protobuf structure of JobConfig."""
- # TODO(edoakes): this is really unfortunate, but JobConfig is imported
- # all over the place so this causes circular imports. We should remove
- # this dependency and pass in a validated runtime_env instead.
- import ray.core.generated.common_pb2 as common_pb2
- from ray._private.utils import get_runtime_env_info
- if self._cached_pb is None:
- pb = common_pb2.JobConfig()
- if self.ray_namespace is None:
- pb.ray_namespace = str(uuid.uuid4())
- else:
- pb.ray_namespace = self.ray_namespace
- pb.jvm_options.extend(self.jvm_options)
- pb.code_search_path.extend(self.code_search_path)
- pb.py_driver_sys_path.extend(self._py_driver_sys_path)
- for k, v in self.metadata.items():
- pb.metadata[k] = v
- parsed_env = self._validate_runtime_env()
- pb.runtime_env_info.CopyFrom(
- get_runtime_env_info(
- parsed_env,
- is_job_runtime_env=True,
- serialize=False,
- )
- )
- if self._default_actor_lifetime is not None:
- pb.default_actor_lifetime = self._default_actor_lifetime
- if self.py_logging_config:
- pb.serialized_py_logging_config = pickle.dumps(self.py_logging_config)
- self._cached_pb = pb
- return self._cached_pb
- def _runtime_env_has_working_dir(self):
- return self._validate_runtime_env().has_working_dir()
- def _get_serialized_runtime_env(self) -> str:
- """Return the JSON-serialized parsed runtime env dict"""
- return self._validate_runtime_env().serialize()
- def _get_proto_runtime_env_config(self) -> str:
- """Return the JSON-serialized parsed runtime env info"""
- return self._get_proto_job_config().runtime_env_info.runtime_env_config
- @classmethod
- def from_json(cls, job_config_json):
- """Generates a JobConfig object from json.
- Examples:
- .. testcode::
- from ray.job_config import JobConfig
- job_config = JobConfig.from_json(
- {"runtime_env": {"working_dir": "uri://abc"}})
- Args:
- job_config_json: The job config json dictionary.
- """
- return cls(
- jvm_options=job_config_json.get("jvm_options", None),
- code_search_path=job_config_json.get("code_search_path", None),
- runtime_env=job_config_json.get("runtime_env", None),
- metadata=job_config_json.get("metadata", None),
- ray_namespace=job_config_json.get("ray_namespace", None),
- _client_job=job_config_json.get("client_job", False),
- _py_driver_sys_path=job_config_json.get("py_driver_sys_path", None),
- )
|