utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. import asyncio
  2. import binascii
  3. import errno
  4. import importlib
  5. import inspect
  6. import os
  7. import random
  8. import string
  9. import sys
  10. import tempfile
  11. import time
  12. from abc import ABC, abstractmethod
  13. from inspect import signature
  14. from types import ModuleType
  15. from typing import Any, Coroutine, Dict, Optional, Tuple
  16. import psutil
  17. def import_module_and_attr(
  18. full_path: str, *, reload_module: bool = False
  19. ) -> Tuple[ModuleType, Any]:
  20. """Given a full import path to a module attr, return the imported module and attr.
  21. If `reload_module` is set, the module will be reloaded using `importlib.reload`.
  22. Args:
  23. full_path: The full import path to the module and attr.
  24. reload_module: Whether to reload the module.
  25. Returns:
  26. A tuple of the imported module and attr.
  27. """
  28. if ":" in full_path:
  29. if full_path.count(":") > 1:
  30. raise ValueError(
  31. f'Got invalid import path "{full_path}". An '
  32. "import path may have at most one colon."
  33. )
  34. module_name, attr_name = full_path.split(":")
  35. else:
  36. last_period_idx = full_path.rfind(".")
  37. module_name = full_path[:last_period_idx]
  38. attr_name = full_path[last_period_idx + 1 :]
  39. module = importlib.import_module(module_name)
  40. if reload_module:
  41. importlib.reload(module)
  42. return module, getattr(module, attr_name)
  43. def import_attr(full_path: str, *, reload_module: bool = False) -> Any:
  44. """Given a full import path to a module attr, return the imported attr.
  45. If `reload_module` is set, the module will be reloaded using `importlib.reload`.
  46. For example, the following are equivalent:
  47. MyClass = import_attr("module.submodule:MyClass")
  48. MyClass = import_attr("module.submodule.MyClass")
  49. from module.submodule import MyClass
  50. Returns:
  51. Imported attr
  52. """
  53. return import_module_and_attr(full_path, reload_module=reload_module)[1]
  54. def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
  55. """Get a running async event loop if one exists, otherwise create one.
  56. This function serves as a proxy for the deprecating get_event_loop().
  57. It tries to get the running loop first, and if no running loop
  58. could be retrieved:
  59. - For python version <3.10: it falls back to the get_event_loop
  60. call.
  61. - For python version >= 3.10: it uses the same python implementation
  62. of _get_event_loop() at asyncio/events.py.
  63. Ideally, one should use high level APIs like asyncio.run() with python
  64. version >= 3.7, if not possible, one should create and manage the event
  65. loops explicitly.
  66. """
  67. vers_info = sys.version_info
  68. if vers_info.major >= 3 and vers_info.minor >= 10:
  69. # This follows the implementation of the deprecating `get_event_loop`
  70. # in python3.10's asyncio. See python3.10/asyncio/events.py
  71. # _get_event_loop()
  72. try:
  73. loop = asyncio.get_running_loop()
  74. assert loop is not None
  75. return loop
  76. except RuntimeError as e:
  77. # No running loop, relying on the error message as for now to
  78. # differentiate runtime errors.
  79. assert "no running event loop" in str(e)
  80. return asyncio.get_event_loop_policy().get_event_loop()
  81. return asyncio.get_event_loop()
  82. _BACKGROUND_TASKS = set()
  83. def run_background_task(coroutine: Coroutine) -> asyncio.Task:
  84. """Schedule a task reliably to the event loop.
  85. This API is used when you don't want to cache the reference of `asyncio.Task`.
  86. For example,
  87. ```
  88. get_event_loop().create_task(coroutine(*args))
  89. ```
  90. The above code doesn't guarantee to schedule the coroutine to the event loops
  91. When using create_task in a "fire and forget" way, we should keep the references
  92. alive for the reliable execution. This API is used to fire and forget
  93. asynchronous execution.
  94. https://docs.python.org/3/library/asyncio-task.html#creating-tasks
  95. """
  96. task = get_or_create_event_loop().create_task(coroutine)
  97. # Add task to the set. This creates a strong reference.
  98. _BACKGROUND_TASKS.add(task)
  99. # To prevent keeping references to finished tasks forever,
  100. # make each task remove its own reference from the set after
  101. # completion:
  102. task.add_done_callback(_BACKGROUND_TASKS.discard)
  103. return task
  104. # Used in gpu detection
  105. RESOURCE_CONSTRAINT_PREFIX = "accelerator_type:"
  106. PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME = "bundle"
  107. def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]:
  108. """Determine a task's resource requirements.
  109. Args:
  110. options_dict: The dictionary that contains resources requirements.
  111. Returns:
  112. A dictionary of the resource requirements for the task.
  113. """
  114. resources = (options_dict.get("resources") or {}).copy()
  115. if "CPU" in resources or "GPU" in resources:
  116. raise ValueError(
  117. "The resources dictionary must not contain the key 'CPU' or 'GPU'"
  118. )
  119. elif "memory" in resources or "object_store_memory" in resources:
  120. raise ValueError(
  121. "The resources dictionary must not "
  122. "contain the key 'memory' or 'object_store_memory'"
  123. )
  124. elif PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME in resources:
  125. raise ValueError(
  126. "The resource should not include `bundle` which "
  127. f"is reserved for Ray. resources: {resources}"
  128. )
  129. num_cpus = options_dict.get("num_cpus")
  130. num_gpus = options_dict.get("num_gpus")
  131. memory = options_dict.get("memory")
  132. object_store_memory = options_dict.get("object_store_memory")
  133. accelerator_type = options_dict.get("accelerator_type")
  134. if num_cpus is not None:
  135. resources["CPU"] = num_cpus
  136. if num_gpus is not None:
  137. resources["GPU"] = num_gpus
  138. if memory is not None:
  139. resources["memory"] = int(memory)
  140. if object_store_memory is not None:
  141. resources["object_store_memory"] = object_store_memory
  142. if accelerator_type is not None:
  143. resources[f"{RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}"] = 0.001
  144. return resources
  145. # Match the standard alphabet used for UUIDs.
  146. RANDOM_STRING_ALPHABET = string.ascii_lowercase + string.digits
  147. def get_random_alphanumeric_string(length: int):
  148. """Generates random string of length consisting exclusively of
  149. - Lower-case ASCII chars
  150. - Digits
  151. """
  152. return "".join(random.choices(RANDOM_STRING_ALPHABET, k=length))
  153. _PRINTED_WARNING = set()
  154. def get_call_location(back: int = 1):
  155. """
  156. Get the location (filename and line number) of a function caller, `back`
  157. frames up the stack.
  158. Args:
  159. back: The number of frames to go up the stack, not including this
  160. function.
  161. Returns:
  162. A string with the filename and line number of the caller.
  163. For example, "myfile.py:123".
  164. """
  165. stack = inspect.stack()
  166. try:
  167. frame = stack[back + 1]
  168. return f"{frame.filename}:{frame.lineno}"
  169. except IndexError:
  170. return "UNKNOWN"
  171. def get_user_temp_dir():
  172. if "RAY_TMPDIR" in os.environ:
  173. return os.environ["RAY_TMPDIR"]
  174. elif sys.platform.startswith("linux") and "TMPDIR" in os.environ:
  175. return os.environ["TMPDIR"]
  176. elif sys.platform.startswith("darwin") or sys.platform.startswith("linux"):
  177. # Ideally we wouldn't need this fallback, but keep it for now for
  178. # for compatibility
  179. tempdir = os.path.join(os.sep, "tmp")
  180. else:
  181. tempdir = tempfile.gettempdir()
  182. return tempdir
  183. def get_ray_temp_dir():
  184. return os.path.join(get_user_temp_dir(), "ray")
  185. def get_ray_address_file(temp_dir: Optional[str]):
  186. if temp_dir is None:
  187. temp_dir = get_ray_temp_dir()
  188. return os.path.join(temp_dir, "ray_current_cluster")
  189. def reset_ray_address(temp_dir: Optional[str] = None):
  190. address_file = get_ray_address_file(temp_dir)
  191. if os.path.exists(address_file):
  192. try:
  193. os.remove(address_file)
  194. except OSError:
  195. pass
  196. def load_class(path):
  197. """Load a class at runtime given a full path.
  198. Example of the path: mypkg.mysubpkg.myclass
  199. """
  200. class_data = path.split(".")
  201. if len(class_data) < 2:
  202. raise ValueError("You need to pass a valid path like mymodule.provider_class")
  203. module_path = ".".join(class_data[:-1])
  204. class_str = class_data[-1]
  205. module = importlib.import_module(module_path)
  206. return getattr(module, class_str)
  207. def get_system_memory(
  208. # For cgroups v1:
  209. memory_limit_filename: str = "/sys/fs/cgroup/memory/memory.limit_in_bytes",
  210. # For cgroups v2:
  211. memory_limit_filename_v2: str = "/sys/fs/cgroup/memory.max",
  212. ):
  213. """Return the total amount of system memory in bytes.
  214. Args:
  215. memory_limit_filename: The path to the file that contains the memory
  216. limit for the Docker container. Defaults to
  217. /sys/fs/cgroup/memory/memory.limit_in_bytes.
  218. memory_limit_filename_v2: The path to the file that contains the memory
  219. limit for the Docker container in cgroups v2. Defaults to
  220. /sys/fs/cgroup/memory.max.
  221. Returns:
  222. The total amount of system memory in bytes.
  223. """
  224. # Try to accurately figure out the memory limit if we are in a docker
  225. # container. Note that this file is not specific to Docker and its value is
  226. # often much larger than the actual amount of memory.
  227. docker_limit = None
  228. if os.path.exists(memory_limit_filename):
  229. with open(memory_limit_filename, "r") as f:
  230. docker_limit = int(f.read().strip())
  231. elif os.path.exists(memory_limit_filename_v2):
  232. with open(memory_limit_filename_v2, "r") as f:
  233. # Don't forget to strip() the newline:
  234. max_file = f.read().strip()
  235. if max_file.isnumeric():
  236. docker_limit = int(max_file)
  237. else:
  238. # max_file is "max", i.e. is unset.
  239. docker_limit = None
  240. # Use psutil if it is available.
  241. psutil_memory_in_bytes = psutil.virtual_memory().total
  242. if docker_limit is not None:
  243. # We take the min because the cgroup limit is very large if we aren't
  244. # in Docker.
  245. return min(docker_limit, psutil_memory_in_bytes)
  246. return psutil_memory_in_bytes
  247. def binary_to_hex(identifier):
  248. hex_identifier = binascii.hexlify(identifier)
  249. hex_identifier = hex_identifier.decode()
  250. return hex_identifier
  251. def hex_to_binary(hex_identifier):
  252. return binascii.unhexlify(hex_identifier)
  253. def try_make_directory_shared(directory_path):
  254. try:
  255. os.chmod(directory_path, 0o0777)
  256. except OSError as e:
  257. # Silently suppress the PermissionError that is thrown by the chmod.
  258. # This is done because the user attempting to change the permissions
  259. # on a directory may not own it. The chmod is attempted whether the
  260. # directory is new or not to avoid race conditions.
  261. # ray-project/ray/#3591
  262. if e.errno in [errno.EACCES, errno.EPERM]:
  263. pass
  264. else:
  265. raise
  266. def try_to_create_directory(directory_path):
  267. # Attempt to create a directory that is globally readable/writable.
  268. directory_path = os.path.expanduser(directory_path)
  269. os.makedirs(directory_path, exist_ok=True)
  270. # Change the log directory permissions so others can use it. This is
  271. # important when multiple people are using the same machine.
  272. try_make_directory_shared(directory_path)
  273. def get_function_args(callable):
  274. all_parameters = frozenset(signature(callable).parameters)
  275. return list(all_parameters)
  276. def decode(byte_str: str, allow_none: bool = False, encode_type: str = "utf-8"):
  277. """Make this unicode in Python 3, otherwise leave it as bytes.
  278. Args:
  279. byte_str: The byte string to decode.
  280. allow_none: If true, then we will allow byte_str to be None in which
  281. case we will return an empty string. TODO(rkn): Remove this flag.
  282. This is only here to simplify upgrading to flatbuffers 1.10.0.
  283. encode_type: The encoding type to use for decoding. Defaults to "utf-8".
  284. Returns:
  285. A byte string in Python 2 and a unicode string in Python 3.
  286. """
  287. if byte_str is None and allow_none:
  288. return ""
  289. if not isinstance(byte_str, bytes):
  290. raise ValueError(f"The argument {byte_str} must be a bytes object.")
  291. return byte_str.decode(encode_type)
  292. class TimerBase(ABC):
  293. @abstractmethod
  294. def time(self) -> float:
  295. """Return the current time."""
  296. raise NotImplementedError
  297. class Timer(TimerBase):
  298. def time(self) -> float:
  299. return time.time()