resource_updater.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. import logging
  2. import os
  3. import time
  4. from collections import namedtuple
  5. from numbers import Number
  6. from typing import Any, Dict, Optional
  7. import ray
  8. from ray._common.constants import NODE_ID_PREFIX
  9. logger = logging.getLogger(__name__)
  10. TUNE_STATE_REFRESH_PERIOD = 10 # Refresh resources every 10 s
  11. def _to_gb(n_bytes):
  12. return round(n_bytes / (1024**3), 2)
  13. class _Resources(
  14. namedtuple(
  15. "_Resources",
  16. [
  17. "cpu",
  18. "gpu",
  19. "memory",
  20. "object_store_memory",
  21. "extra_cpu",
  22. "extra_gpu",
  23. "extra_memory",
  24. "extra_object_store_memory",
  25. "custom_resources",
  26. "extra_custom_resources",
  27. "has_placement_group",
  28. ],
  29. )
  30. ):
  31. """Ray resources required to schedule a trial.
  32. Parameters:
  33. cpu: Number of CPUs to allocate to the trial.
  34. gpu: Number of GPUs to allocate to the trial.
  35. memory: Memory to reserve for the trial.
  36. object_store_memory: Object store memory to reserve.
  37. extra_cpu: Extra CPUs to reserve in case the trial needs to
  38. launch additional Ray actors that use CPUs.
  39. extra_gpu: Extra GPUs to reserve in case the trial needs to
  40. launch additional Ray actors that use GPUs.
  41. extra_memory: Memory to reserve for the trial launching
  42. additional Ray actors that use memory.
  43. extra_object_store_memory: Object store memory to reserve for
  44. the trial launching additional Ray actors that use object store
  45. memory.
  46. custom_resources: Mapping of resource to quantity to allocate
  47. to the trial.
  48. extra_custom_resources: Extra custom resources to reserve in
  49. case the trial needs to launch additional Ray actors that use
  50. any of these custom resources.
  51. has_placement_group: Bool indicating if the trial also
  52. has an associated placement group.
  53. """
  54. __slots__ = ()
  55. def __new__(
  56. cls,
  57. cpu: float,
  58. gpu: float,
  59. memory: float = 0,
  60. object_store_memory: float = 0.0,
  61. extra_cpu: float = 0.0,
  62. extra_gpu: float = 0.0,
  63. extra_memory: float = 0.0,
  64. extra_object_store_memory: float = 0.0,
  65. custom_resources: Optional[dict] = None,
  66. extra_custom_resources: Optional[dict] = None,
  67. has_placement_group: bool = False,
  68. ):
  69. custom_resources = custom_resources or {}
  70. extra_custom_resources = extra_custom_resources or {}
  71. leftovers = set(custom_resources) ^ set(extra_custom_resources)
  72. for value in leftovers:
  73. custom_resources.setdefault(value, 0)
  74. extra_custom_resources.setdefault(value, 0)
  75. cpu = round(cpu, 2)
  76. gpu = round(gpu, 2)
  77. memory = round(memory, 2)
  78. object_store_memory = round(object_store_memory, 2)
  79. extra_cpu = round(extra_cpu, 2)
  80. extra_gpu = round(extra_gpu, 2)
  81. extra_memory = round(extra_memory, 2)
  82. extra_object_store_memory = round(extra_object_store_memory, 2)
  83. custom_resources = {
  84. resource: round(value, 2) for resource, value in custom_resources.items()
  85. }
  86. extra_custom_resources = {
  87. resource: round(value, 2)
  88. for resource, value in extra_custom_resources.items()
  89. }
  90. all_values = [
  91. cpu,
  92. gpu,
  93. memory,
  94. object_store_memory,
  95. extra_cpu,
  96. extra_gpu,
  97. extra_memory,
  98. extra_object_store_memory,
  99. ]
  100. all_values += list(custom_resources.values())
  101. all_values += list(extra_custom_resources.values())
  102. assert len(custom_resources) == len(extra_custom_resources)
  103. for entry in all_values:
  104. assert isinstance(entry, Number), ("Improper resource value.", entry)
  105. return super(_Resources, cls).__new__(
  106. cls,
  107. cpu,
  108. gpu,
  109. memory,
  110. object_store_memory,
  111. extra_cpu,
  112. extra_gpu,
  113. extra_memory,
  114. extra_object_store_memory,
  115. custom_resources,
  116. extra_custom_resources,
  117. has_placement_group,
  118. )
  119. def summary_string(self):
  120. summary = "{} CPUs, {} GPUs".format(
  121. self.cpu + self.extra_cpu, self.gpu + self.extra_gpu
  122. )
  123. if self.memory or self.extra_memory:
  124. summary += ", {} GiB heap".format(
  125. round((self.memory + self.extra_memory) / (1024**3), 2)
  126. )
  127. if self.object_store_memory or self.extra_object_store_memory:
  128. summary += ", {} GiB objects".format(
  129. round(
  130. (self.object_store_memory + self.extra_object_store_memory)
  131. / (1024**3),
  132. 2,
  133. )
  134. )
  135. custom_summary = ", ".join(
  136. [
  137. "{} {}".format(self.get_res_total(res), res)
  138. for res in self.custom_resources
  139. if not res.startswith(NODE_ID_PREFIX)
  140. ]
  141. )
  142. if custom_summary:
  143. summary += " ({})".format(custom_summary)
  144. return summary
  145. def cpu_total(self):
  146. return self.cpu + self.extra_cpu
  147. def gpu_total(self):
  148. return self.gpu + self.extra_gpu
  149. def memory_total(self):
  150. return self.memory + self.extra_memory
  151. def object_store_memory_total(self):
  152. return self.object_store_memory + self.extra_object_store_memory
  153. def get_res_total(self, key):
  154. return self.custom_resources.get(key, 0) + self.extra_custom_resources.get(
  155. key, 0
  156. )
  157. def get(self, key):
  158. return self.custom_resources.get(key, 0)
  159. def is_nonnegative(self):
  160. all_values = [self.cpu, self.gpu, self.extra_cpu, self.extra_gpu]
  161. all_values += list(self.custom_resources.values())
  162. all_values += list(self.extra_custom_resources.values())
  163. return all(v >= 0 for v in all_values)
  164. @classmethod
  165. def subtract(cls, original, to_remove):
  166. cpu = original.cpu - to_remove.cpu
  167. gpu = original.gpu - to_remove.gpu
  168. memory = original.memory - to_remove.memory
  169. object_store_memory = (
  170. original.object_store_memory - to_remove.object_store_memory
  171. )
  172. extra_cpu = original.extra_cpu - to_remove.extra_cpu
  173. extra_gpu = original.extra_gpu - to_remove.extra_gpu
  174. extra_memory = original.extra_memory - to_remove.extra_memory
  175. extra_object_store_memory = (
  176. original.extra_object_store_memory - to_remove.extra_object_store_memory
  177. )
  178. all_resources = set(original.custom_resources).union(
  179. set(to_remove.custom_resources)
  180. )
  181. new_custom_res = {
  182. k: original.custom_resources.get(k, 0)
  183. - to_remove.custom_resources.get(k, 0)
  184. for k in all_resources
  185. }
  186. extra_custom_res = {
  187. k: original.extra_custom_resources.get(k, 0)
  188. - to_remove.extra_custom_resources.get(k, 0)
  189. for k in all_resources
  190. }
  191. return _Resources(
  192. cpu,
  193. gpu,
  194. memory,
  195. object_store_memory,
  196. extra_cpu,
  197. extra_gpu,
  198. extra_memory,
  199. extra_object_store_memory,
  200. new_custom_res,
  201. extra_custom_res,
  202. )
  203. class _ResourceUpdater:
  204. """Periodic Resource updater for Tune.
  205. Initially, all resources are set to 0. The updater will try to update resources
  206. when (1) init ResourceUpdater (2) call "update_avail_resources", "num_cpus"
  207. or "num_gpus".
  208. The update takes effect when (1) Ray is initialized (2) the interval between
  209. this and last update is larger than "refresh_period"
  210. """
  211. def __init__(self, refresh_period: Optional[float] = None):
  212. self._avail_resources = _Resources(cpu=0, gpu=0)
  213. if refresh_period is None:
  214. refresh_period = float(
  215. os.environ.get("TUNE_STATE_REFRESH_PERIOD", TUNE_STATE_REFRESH_PERIOD)
  216. )
  217. self._refresh_period = refresh_period
  218. self._last_resource_refresh = float("-inf")
  219. self.update_avail_resources()
  220. def update_avail_resources(self, num_retries: int = 5, force: bool = False):
  221. if not ray.is_initialized():
  222. return
  223. if (
  224. time.time() - self._last_resource_refresh < self._refresh_period
  225. and not force
  226. ):
  227. return
  228. logger.debug("Checking Ray cluster resources.")
  229. resources = None
  230. for i in range(num_retries):
  231. if i > 0:
  232. logger.warning(
  233. f"Cluster resources not detected or are 0. Attempt #{i + 1}...",
  234. )
  235. time.sleep(0.5)
  236. resources = ray.cluster_resources()
  237. if resources:
  238. break
  239. if not resources:
  240. # NOTE: This hides the possibility that Ray may be waiting for
  241. # clients to connect.
  242. resources.setdefault("CPU", 0)
  243. resources.setdefault("GPU", 0)
  244. logger.warning(
  245. "Cluster resources cannot be detected or are 0. "
  246. "You can resume this experiment by passing in `resume=True` to `run`."
  247. )
  248. resources = resources.copy()
  249. num_cpus = resources.pop("CPU", 0)
  250. num_gpus = resources.pop("GPU", 0)
  251. memory = resources.pop("memory", 0)
  252. object_store_memory = resources.pop("object_store_memory", 0)
  253. custom_resources = resources
  254. self._avail_resources = _Resources(
  255. int(num_cpus),
  256. int(num_gpus),
  257. memory=int(memory),
  258. object_store_memory=int(object_store_memory),
  259. custom_resources=custom_resources,
  260. )
  261. self._last_resource_refresh = time.time()
  262. def _get_used_avail_resources(self, total_allocated_resources: Dict[str, Any]):
  263. total_allocated_resources = total_allocated_resources.copy()
  264. used_cpu = total_allocated_resources.pop("CPU", 0)
  265. total_cpu = self._avail_resources.cpu
  266. used_gpu = total_allocated_resources.pop("GPU", 0)
  267. total_gpu = self._avail_resources.gpu
  268. custom_used_total = {
  269. name: (
  270. total_allocated_resources.get(name, 0.0),
  271. self._avail_resources.get_res_total(name),
  272. )
  273. for name in self._avail_resources.custom_resources
  274. if not name.startswith(NODE_ID_PREFIX)
  275. and (total_allocated_resources.get(name, 0.0) > 0 or "_group_" not in name)
  276. }
  277. return used_cpu, total_cpu, used_gpu, total_gpu, custom_used_total
  278. def debug_string(self, total_allocated_resources: Dict[str, Any]) -> str:
  279. """Returns a human readable message for printing to the console."""
  280. if self._last_resource_refresh > 0:
  281. (
  282. used_cpu,
  283. total_cpu,
  284. used_gpu,
  285. total_gpu,
  286. custom_used_total,
  287. ) = self._get_used_avail_resources(total_allocated_resources)
  288. if (
  289. used_cpu > total_cpu
  290. or used_gpu > total_gpu
  291. or any(used > total for (used, total) in custom_used_total.values())
  292. ):
  293. # If any of the used resources are higher than what we currently think
  294. # is available, update our state and re-fetch
  295. self.update_avail_resources(force=True)
  296. (
  297. used_cpu,
  298. total_cpu,
  299. used_gpu,
  300. total_gpu,
  301. custom_used_total,
  302. ) = self._get_used_avail_resources(total_allocated_resources)
  303. status = (
  304. f"Logical resource usage: {used_cpu}/{total_cpu} CPUs, "
  305. f"{used_gpu}/{total_gpu} GPUs"
  306. )
  307. customs = ", ".join(
  308. f"{used}/{total} {name}"
  309. for name, (used, total) in custom_used_total.items()
  310. )
  311. if customs:
  312. status += f" ({customs})"
  313. return status
  314. else:
  315. return "Logical resource usage: ?"
  316. def get_num_cpus(self) -> int:
  317. self.update_avail_resources()
  318. return self._avail_resources.cpu
  319. def get_num_gpus(self) -> int:
  320. self.update_avail_resources()
  321. return self._avail_resources.gpu
  322. def __reduce__(self):
  323. # Do not need to serialize resources, because we can always
  324. # update it again. This also prevents keeping outdated resources
  325. # when deserialized.
  326. return _ResourceUpdater, (self._refresh_period,)