uv.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. """Util class to install packages via uv.
  2. """
  3. import asyncio
  4. import hashlib
  5. import json
  6. import logging
  7. import os
  8. import shutil
  9. import sys
  10. from asyncio import create_task, get_running_loop
  11. from typing import Dict, List, Optional
  12. from ray._common.utils import try_to_create_directory
  13. from ray._private.runtime_env import dependency_utils, virtualenv_utils
  14. from ray._private.runtime_env.packaging import Protocol, parse_uri
  15. from ray._private.runtime_env.plugin import RuntimeEnvPlugin
  16. from ray._private.runtime_env.utils import check_output_cmd
  17. from ray._private.utils import get_directory_size_bytes
  18. default_logger = logging.getLogger(__name__)
  19. def _get_uv_hash(uv_dict: Dict) -> str:
  20. """Get a deterministic hash value for `uv` related runtime envs."""
  21. serialized_uv_spec = json.dumps(uv_dict, sort_keys=True)
  22. hash_val = hashlib.sha1(serialized_uv_spec.encode("utf-8")).hexdigest()
  23. return hash_val
  24. def get_uri(runtime_env: Dict) -> Optional[str]:
  25. """Return `"uv://<hashed_dependencies>"`, or None if no GC required."""
  26. uv = runtime_env.get("uv")
  27. if uv is not None:
  28. if isinstance(uv, dict):
  29. uri = "uv://" + _get_uv_hash(uv_dict=uv)
  30. elif isinstance(uv, list):
  31. uri = "uv://" + _get_uv_hash(uv_dict=dict(packages=uv))
  32. else:
  33. raise TypeError(
  34. "uv field received by RuntimeEnvAgent must be "
  35. f"list or dict, not {type(uv).__name__}."
  36. )
  37. else:
  38. uri = None
  39. return uri
  40. class UvProcessor:
  41. def __init__(
  42. self,
  43. target_dir: str,
  44. runtime_env: "RuntimeEnv", # noqa: F821
  45. logger: Optional[logging.Logger] = default_logger,
  46. ):
  47. try:
  48. import virtualenv # noqa: F401 ensure virtualenv exists.
  49. except ImportError:
  50. raise RuntimeError(
  51. f"Please install virtualenv "
  52. f"`{sys.executable} -m pip install virtualenv`"
  53. f"to enable uv runtime env."
  54. )
  55. logger.debug("Setting up uv for runtime_env: %s", runtime_env)
  56. self._target_dir = target_dir
  57. # An empty directory is created to execute cmd.
  58. self._exec_cwd = os.path.join(self._target_dir, "exec_cwd")
  59. self._runtime_env = runtime_env
  60. self._logger = logger
  61. self._uv_config = self._runtime_env.uv_config()
  62. self._uv_env = os.environ.copy()
  63. self._uv_env.update(self._runtime_env.env_vars())
  64. async def _install_uv(
  65. self, path: str, cwd: str, pip_env: dict, logger: logging.Logger
  66. ):
  67. """Before package install, make sure the required version `uv` (if specifieds)
  68. is installed.
  69. """
  70. virtualenv_path = virtualenv_utils.get_virtualenv_path(path)
  71. python = virtualenv_utils.get_virtualenv_python(path)
  72. def _get_uv_exec_to_install() -> str:
  73. """Get `uv` executable with version to install."""
  74. uv_version = self._uv_config.get("uv_version", None)
  75. if uv_version:
  76. return f"uv{uv_version}"
  77. # Use default version.
  78. return "uv"
  79. uv_install_cmd = [
  80. python,
  81. "-m",
  82. "pip",
  83. "install",
  84. "--disable-pip-version-check",
  85. "--no-cache-dir",
  86. _get_uv_exec_to_install(),
  87. ]
  88. logger.info("Installing package uv to %s", virtualenv_path)
  89. await check_output_cmd(uv_install_cmd, logger=logger, cwd=cwd, env=pip_env)
  90. async def _check_uv_existence(
  91. self, path: str, cwd: str, env: dict, logger: logging.Logger
  92. ) -> bool:
  93. """Check and return the existence of `uv` in virtual env."""
  94. python = virtualenv_utils.get_virtualenv_python(path)
  95. check_existence_cmd = [
  96. python,
  97. "-m",
  98. "uv",
  99. "version",
  100. ]
  101. try:
  102. # If `uv` doesn't exist, exception will be thrown.
  103. await check_output_cmd(check_existence_cmd, logger=logger, cwd=cwd, env=env)
  104. return True
  105. except Exception:
  106. return False
  107. async def _uv_check(sef, python: str, cwd: str, logger: logging.Logger) -> None:
  108. """Check virtual env dependency compatibility.
  109. If any incompatibility detected, exception will be thrown.
  110. param:
  111. python: the path for python executable within virtual environment.
  112. """
  113. cmd = [python, "-m", "uv", "pip", "check"]
  114. await check_output_cmd(
  115. cmd,
  116. logger=logger,
  117. cwd=cwd,
  118. )
  119. async def _install_uv_packages(
  120. self,
  121. path: str,
  122. uv_packages: List[str],
  123. cwd: str,
  124. pip_env: Dict,
  125. logger: logging.Logger,
  126. ):
  127. """Install required python packages via `uv`."""
  128. virtualenv_path = virtualenv_utils.get_virtualenv_path(path)
  129. python = virtualenv_utils.get_virtualenv_python(path)
  130. # TODO(fyrestone): Support -i, --no-deps, --no-cache-dir, ...
  131. requirements_file = dependency_utils.get_requirements_file(path, uv_packages)
  132. # Check existence for `uv` and see if we could skip `uv` installation.
  133. uv_exists = await self._check_uv_existence(path, cwd, pip_env, logger)
  134. # Install uv, which acts as the default package manager.
  135. if (not uv_exists) or (self._uv_config.get("uv_version", None) is not None):
  136. await self._install_uv(path, cwd, pip_env, logger)
  137. # Avoid blocking the event loop.
  138. loop = get_running_loop()
  139. await loop.run_in_executor(
  140. None, dependency_utils.gen_requirements_txt, requirements_file, uv_packages
  141. )
  142. # Install all dependencies.
  143. #
  144. # Difference with pip:
  145. # 1. `--disable-pip-version-check` has no effect for uv.
  146. uv_install_cmd = [
  147. python,
  148. "-m",
  149. "uv",
  150. "pip",
  151. "install",
  152. "-r",
  153. requirements_file,
  154. ]
  155. uv_opt_list = self._uv_config.get("uv_pip_install_options", ["--no-cache"])
  156. if uv_opt_list:
  157. uv_install_cmd += uv_opt_list
  158. logger.info("Installing python requirements to %s", virtualenv_path)
  159. await check_output_cmd(uv_install_cmd, logger=logger, cwd=cwd, env=pip_env)
  160. # Check python environment for conflicts.
  161. if self._uv_config.get("uv_check", False):
  162. await self._uv_check(python, cwd, logger)
  163. async def _run(self):
  164. path = self._target_dir
  165. logger = self._logger
  166. uv_packages = self._uv_config["packages"]
  167. # We create an empty directory for exec cmd so that the cmd will
  168. # run more stable. e.g. if cwd has ray, then checking ray will
  169. # look up ray in cwd instead of site packages.
  170. os.makedirs(self._exec_cwd, exist_ok=True)
  171. try:
  172. await virtualenv_utils.create_or_get_virtualenv(
  173. path, self._exec_cwd, logger
  174. )
  175. python = virtualenv_utils.get_virtualenv_python(path)
  176. async with dependency_utils.check_ray(python, self._exec_cwd, logger):
  177. # Install packages with uv.
  178. await self._install_uv_packages(
  179. path,
  180. uv_packages,
  181. self._exec_cwd,
  182. self._uv_env,
  183. logger,
  184. )
  185. except Exception:
  186. logger.info("Delete incomplete virtualenv: %s", path)
  187. shutil.rmtree(path, ignore_errors=True)
  188. logger.exception("Failed to install uv packages.")
  189. raise
  190. def __await__(self):
  191. return self._run().__await__()
  192. class UvPlugin(RuntimeEnvPlugin):
  193. name = "uv"
  194. def __init__(self, resources_dir: str):
  195. self._uv_resource_dir = os.path.join(resources_dir, "uv")
  196. self._creating_task = {}
  197. # Maps a URI to a lock that is used to prevent multiple concurrent
  198. # installs of the same virtualenv, see #24513
  199. self._create_locks: Dict[str, asyncio.Lock] = {}
  200. # Key: created hashes. Value: size of the uv dir.
  201. self._created_hash_bytes: Dict[str, int] = {}
  202. try_to_create_directory(self._uv_resource_dir)
  203. def _get_path_from_hash(self, hash_val: str) -> str:
  204. """Generate a path from the hash of a uv spec.
  205. Example output:
  206. /tmp/ray/session_2021-11-03_16-33-59_356303_41018/runtime_resources
  207. /uv/ray-9a7972c3a75f55e976e620484f58410c920db091
  208. """
  209. return os.path.join(self._uv_resource_dir, hash_val)
  210. def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]: # noqa: F821
  211. """Return the uv URI from the RuntimeEnv if it exists, else return []."""
  212. uv_uri = runtime_env.uv_uri()
  213. if uv_uri:
  214. return [uv_uri]
  215. return []
  216. def delete_uri(
  217. self, uri: str, logger: Optional[logging.Logger] = default_logger
  218. ) -> int:
  219. """Delete URI and return the number of bytes deleted."""
  220. logger.info("Got request to delete uv URI %s", uri)
  221. protocol, hash_val = parse_uri(uri)
  222. if protocol != Protocol.UV:
  223. raise ValueError(
  224. "UvPlugin can only delete URIs with protocol "
  225. f"uv. Received protocol {protocol}, URI {uri}"
  226. )
  227. # Cancel running create task.
  228. task = self._creating_task.pop(hash_val, None)
  229. if task is not None:
  230. task.cancel()
  231. del self._created_hash_bytes[hash_val]
  232. uv_env_path = self._get_path_from_hash(hash_val)
  233. local_dir_size = get_directory_size_bytes(uv_env_path)
  234. del self._create_locks[uri]
  235. try:
  236. shutil.rmtree(uv_env_path)
  237. except OSError as e:
  238. logger.warning(f"Error when deleting uv env {uv_env_path}: {str(e)}")
  239. return 0
  240. return local_dir_size
  241. async def create(
  242. self,
  243. uri: str,
  244. runtime_env: "RuntimeEnv", # noqa: F821
  245. context: "RuntimeEnvContext", # noqa: F821
  246. logger: Optional[logging.Logger] = default_logger,
  247. ) -> int:
  248. if not runtime_env.has_uv():
  249. return 0
  250. protocol, hash_val = parse_uri(uri)
  251. target_dir = self._get_path_from_hash(hash_val)
  252. async def _create_for_hash():
  253. await UvProcessor(
  254. target_dir,
  255. runtime_env,
  256. logger,
  257. )
  258. loop = get_running_loop()
  259. return await loop.run_in_executor(
  260. None, get_directory_size_bytes, target_dir
  261. )
  262. if uri not in self._create_locks:
  263. # async lock to prevent the same virtualenv being concurrently installed
  264. self._create_locks[uri] = asyncio.Lock()
  265. async with self._create_locks[uri]:
  266. if hash_val in self._created_hash_bytes:
  267. return self._created_hash_bytes[hash_val]
  268. self._creating_task[hash_val] = task = create_task(_create_for_hash())
  269. task.add_done_callback(lambda _: self._creating_task.pop(hash_val, None))
  270. uv_dir_bytes = await task
  271. self._created_hash_bytes[hash_val] = uv_dir_bytes
  272. return uv_dir_bytes
  273. def modify_context(
  274. self,
  275. uris: List[str],
  276. runtime_env: "RuntimeEnv", # noqa: F821
  277. context: "RuntimeEnvContext", # noqa: F821
  278. logger: logging.Logger = default_logger,
  279. ):
  280. if not runtime_env.has_uv():
  281. return
  282. # UvPlugin only uses a single URI.
  283. uri = uris[0]
  284. # Update py_executable.
  285. protocol, hash_val = parse_uri(uri)
  286. target_dir = self._get_path_from_hash(hash_val)
  287. virtualenv_python = virtualenv_utils.get_virtualenv_python(target_dir)
  288. if not os.path.exists(virtualenv_python):
  289. raise ValueError(
  290. f"Local directory {target_dir} for URI {uri} does "
  291. "not exist on the cluster. Something may have gone wrong while "
  292. "installing the runtime_env `uv` packages."
  293. )
  294. context.py_executable = virtualenv_python
  295. context.command_prefix += virtualenv_utils.get_virtualenv_activate_command(
  296. target_dir
  297. )