context.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import json
  2. import logging
  3. import os
  4. import shlex
  5. import subprocess
  6. import sys
  7. from typing import Dict, List, Optional
  8. from ray._private.services import get_ray_jars_dir
  9. from ray._private.utils import update_envs
  10. from ray.core.generated.common_pb2 import Language
  11. from ray.util.annotations import DeveloperAPI
  12. logger = logging.getLogger(__name__)
  13. @DeveloperAPI
  14. class RuntimeEnvContext:
  15. """A context used to describe the created runtime env."""
  16. def __init__(
  17. self,
  18. command_prefix: List[str] = None,
  19. env_vars: Dict[str, str] = None,
  20. py_executable: Optional[str] = None,
  21. override_worker_entrypoint: Optional[str] = None,
  22. java_jars: List[str] = None,
  23. ):
  24. self.command_prefix = command_prefix or []
  25. self.env_vars = env_vars or {}
  26. self.py_executable = py_executable or sys.executable
  27. self.override_worker_entrypoint: Optional[str] = override_worker_entrypoint
  28. self.java_jars = java_jars or []
  29. def serialize(self) -> str:
  30. return json.dumps(self.__dict__)
  31. @staticmethod
  32. def deserialize(json_string):
  33. return RuntimeEnvContext(**json.loads(json_string))
  34. def exec_worker(self, passthrough_args: List[str], language: Language):
  35. update_envs(self.env_vars)
  36. if language == Language.PYTHON and sys.platform == "win32":
  37. executable = [self.py_executable]
  38. elif language == Language.PYTHON:
  39. executable = ["exec", self.py_executable]
  40. elif language == Language.JAVA:
  41. executable = ["java"]
  42. ray_jars = os.path.join(get_ray_jars_dir(), "*")
  43. local_java_jars = []
  44. for java_jar in self.java_jars:
  45. local_java_jars.append(f"{java_jar}/*")
  46. local_java_jars.append(java_jar)
  47. class_path_args = ["-cp", ray_jars + ":" + str(":".join(local_java_jars))]
  48. passthrough_args = class_path_args + passthrough_args
  49. elif sys.platform == "win32":
  50. executable = []
  51. else:
  52. executable = ["exec"]
  53. # By default, raylet uses the path to default_worker.py on host.
  54. # However, the path to default_worker.py inside the container
  55. # can be different. We need the user to specify the path to
  56. # default_worker.py inside the container.
  57. if self.override_worker_entrypoint:
  58. logger.debug(
  59. f"Changing the worker entrypoint from {passthrough_args[0]} to "
  60. f"{self.override_worker_entrypoint}."
  61. )
  62. passthrough_args[0] = self.override_worker_entrypoint
  63. if sys.platform == "win32":
  64. def quote(s):
  65. s = s.replace("&", "%26")
  66. return s
  67. passthrough_args = [quote(s) for s in passthrough_args]
  68. cmd = [*self.command_prefix, *executable, *passthrough_args]
  69. logger.debug(f"Exec'ing worker with command: {cmd}")
  70. subprocess.Popen(cmd, shell=True).wait()
  71. else:
  72. # We use shlex to do the necessary shell escape
  73. # of special characters in passthrough_args.
  74. passthrough_args = [shlex.quote(s) for s in passthrough_args]
  75. cmd = [*self.command_prefix, *executable, *passthrough_args]
  76. # TODO(SongGuyang): We add this env to command for macOS because it doesn't
  77. # work for the C++ process of `os.execvp`. We should find a better way to
  78. # fix it.
  79. MACOS_LIBRARY_PATH_ENV_NAME = "DYLD_LIBRARY_PATH"
  80. if MACOS_LIBRARY_PATH_ENV_NAME in os.environ:
  81. cmd.insert(
  82. 0,
  83. f"{MACOS_LIBRARY_PATH_ENV_NAME}="
  84. f"{os.environ[MACOS_LIBRARY_PATH_ENV_NAME]}",
  85. )
  86. logger.debug(f"Exec'ing worker with command: {cmd}")
  87. # PyCharm will monkey patch the os.execvp at
  88. # .pycharm_helpers/pydev/_pydev_bundle/pydev_monkey.py
  89. # The monkey patched os.execvp function has a different
  90. # signature. So, we use os.execvp("executable", args=[])
  91. # instead of os.execvp(file="executable", args=[])
  92. os.execvp("bash", args=["bash", "-c", " ".join(cmd)])