| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import json
- import logging
- import os
- import shlex
- import subprocess
- import sys
- from typing import Dict, List, Optional
- from ray._private.services import get_ray_jars_dir
- from ray._private.utils import update_envs
- from ray.core.generated.common_pb2 import Language
- from ray.util.annotations import DeveloperAPI
- logger = logging.getLogger(__name__)
- @DeveloperAPI
- class RuntimeEnvContext:
- """A context used to describe the created runtime env."""
- def __init__(
- self,
- command_prefix: List[str] = None,
- env_vars: Dict[str, str] = None,
- py_executable: Optional[str] = None,
- override_worker_entrypoint: Optional[str] = None,
- java_jars: List[str] = None,
- ):
- self.command_prefix = command_prefix or []
- self.env_vars = env_vars or {}
- self.py_executable = py_executable or sys.executable
- self.override_worker_entrypoint: Optional[str] = override_worker_entrypoint
- self.java_jars = java_jars or []
- def serialize(self) -> str:
- return json.dumps(self.__dict__)
- @staticmethod
- def deserialize(json_string):
- return RuntimeEnvContext(**json.loads(json_string))
- def exec_worker(self, passthrough_args: List[str], language: Language):
- update_envs(self.env_vars)
- if language == Language.PYTHON and sys.platform == "win32":
- executable = [self.py_executable]
- elif language == Language.PYTHON:
- executable = ["exec", self.py_executable]
- elif language == Language.JAVA:
- executable = ["java"]
- ray_jars = os.path.join(get_ray_jars_dir(), "*")
- local_java_jars = []
- for java_jar in self.java_jars:
- local_java_jars.append(f"{java_jar}/*")
- local_java_jars.append(java_jar)
- class_path_args = ["-cp", ray_jars + ":" + str(":".join(local_java_jars))]
- passthrough_args = class_path_args + passthrough_args
- elif sys.platform == "win32":
- executable = []
- else:
- executable = ["exec"]
- # By default, raylet uses the path to default_worker.py on host.
- # However, the path to default_worker.py inside the container
- # can be different. We need the user to specify the path to
- # default_worker.py inside the container.
- if self.override_worker_entrypoint:
- logger.debug(
- f"Changing the worker entrypoint from {passthrough_args[0]} to "
- f"{self.override_worker_entrypoint}."
- )
- passthrough_args[0] = self.override_worker_entrypoint
- if sys.platform == "win32":
- def quote(s):
- s = s.replace("&", "%26")
- return s
- passthrough_args = [quote(s) for s in passthrough_args]
- cmd = [*self.command_prefix, *executable, *passthrough_args]
- logger.debug(f"Exec'ing worker with command: {cmd}")
- subprocess.Popen(cmd, shell=True).wait()
- else:
- # We use shlex to do the necessary shell escape
- # of special characters in passthrough_args.
- passthrough_args = [shlex.quote(s) for s in passthrough_args]
- cmd = [*self.command_prefix, *executable, *passthrough_args]
- # TODO(SongGuyang): We add this env to command for macOS because it doesn't
- # work for the C++ process of `os.execvp`. We should find a better way to
- # fix it.
- MACOS_LIBRARY_PATH_ENV_NAME = "DYLD_LIBRARY_PATH"
- if MACOS_LIBRARY_PATH_ENV_NAME in os.environ:
- cmd.insert(
- 0,
- f"{MACOS_LIBRARY_PATH_ENV_NAME}="
- f"{os.environ[MACOS_LIBRARY_PATH_ENV_NAME]}",
- )
- logger.debug(f"Exec'ing worker with command: {cmd}")
- # PyCharm will monkey patch the os.execvp at
- # .pycharm_helpers/pydev/_pydev_bundle/pydev_monkey.py
- # The monkey patched os.execvp function has a different
- # signature. So, we use os.execvp("executable", args=[])
- # instead of os.execvp(file="executable", args=[])
- os.execvp("bash", args=["bash", "-c", " ".join(cmd)])
|