insufficient_resources_manager.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import logging
  2. import os
  3. import time
  4. from functools import lru_cache
  5. from typing import Dict, Optional, Tuple
  6. import ray
  7. from ray.tune.execution.cluster_info import _is_ray_cluster
  8. from ray.tune.experiment import Trial
  9. logger = logging.getLogger(__name__)
  10. # Ideally we want to use @cache; but it's only available for python 3.9.
  11. # Caching is only helpful/correct for no autoscaler case.
  12. @lru_cache()
  13. def _get_cluster_resources_no_autoscaler() -> Dict:
  14. return ray.cluster_resources()
  15. def _get_trial_cpu_and_gpu(trial: Trial) -> Tuple[int, int]:
  16. cpu = trial.placement_group_factory.required_resources.get("CPU", 0)
  17. gpu = trial.placement_group_factory.required_resources.get("GPU", 0)
  18. return cpu, gpu
  19. def _can_fulfill_no_autoscaler(trial: Trial) -> bool:
  20. """Calculates if there is enough resources for a PENDING trial.
  21. For no autoscaler case.
  22. """
  23. assert trial.status == Trial.PENDING
  24. asked_cpus, asked_gpus = _get_trial_cpu_and_gpu(trial)
  25. return asked_cpus <= _get_cluster_resources_no_autoscaler().get(
  26. "CPU", 0
  27. ) and asked_gpus <= _get_cluster_resources_no_autoscaler().get("GPU", 0)
  28. @lru_cache()
  29. def _get_insufficient_resources_warning_threshold() -> float:
  30. if _is_ray_cluster():
  31. return float(
  32. os.environ.get(
  33. "TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60"
  34. )
  35. )
  36. else:
  37. # Set the default to 10s so that we don't prematurely determine that
  38. # a cluster cannot fulfill the resources requirements.
  39. # TODO(xwjiang): Change it back once #18608 is resolved.
  40. return float(os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "60"))
  41. MSG_TRAIN_START = (
  42. "Training has not started in the last {wait_time:.0f} seconds. "
  43. "This could be due to the cluster not having enough resources available. "
  44. )
  45. MSG_TRAIN_INSUFFICIENT = (
  46. "You asked for {asked_cpus} CPUs and {asked_gpus} GPUs, but the cluster only "
  47. "has {cluster_cpus} CPUs and {cluster_gpus} GPUs available. "
  48. )
  49. MSG_TRAIN_END = (
  50. "Stop the training and adjust the required resources (e.g. via the "
  51. "`ScalingConfig` or `resources_per_trial`, or `num_workers` for rllib), "
  52. "or add more resources to your cluster."
  53. )
  54. MSG_TUNE_START = (
  55. "No trial is running and no new trial has been started within "
  56. "the last {wait_time:.0f} seconds. "
  57. "This could be due to the cluster not having enough resources available. "
  58. )
  59. MSG_TUNE_INSUFFICIENT = (
  60. "You asked for {asked_cpus} CPUs and {asked_gpus} GPUs per trial, "
  61. "but the cluster only has {cluster_cpus} CPUs and {cluster_gpus} GPUs available. "
  62. )
  63. MSG_TUNE_END = (
  64. "Stop the tuning and adjust the required resources (e.g. via the "
  65. "`ScalingConfig` or `resources_per_trial`, or `num_workers` for rllib), "
  66. "or add more resources to your cluster."
  67. )
  68. # TODO(xwjiang): Consider having a help page with more detailed instructions.
  69. @lru_cache()
  70. def _get_insufficient_resources_warning_msg(
  71. for_train: bool = False, trial: Optional[Trial] = None
  72. ) -> str:
  73. msg = "Ignore this message if the cluster is autoscaling. "
  74. if for_train:
  75. start = MSG_TRAIN_START
  76. insufficient = MSG_TRAIN_INSUFFICIENT
  77. end = MSG_TRAIN_END
  78. else:
  79. start = MSG_TUNE_START
  80. insufficient = MSG_TUNE_INSUFFICIENT
  81. end = MSG_TUNE_END
  82. msg += start.format(wait_time=_get_insufficient_resources_warning_threshold())
  83. if trial:
  84. asked_cpus, asked_gpus = _get_trial_cpu_and_gpu(trial)
  85. cluster_resources = _get_cluster_resources_no_autoscaler()
  86. msg += insufficient.format(
  87. asked_cpus=asked_cpus,
  88. asked_gpus=asked_gpus,
  89. cluster_cpus=cluster_resources.get("CPU", 0),
  90. cluster_gpus=cluster_resources.get("GPU", 0),
  91. )
  92. msg += end
  93. return msg
  94. class _InsufficientResourcesManager:
  95. """Insufficient resources manager.
  96. Makes best effort, conservative guesses about if Tune loop is stuck due to
  97. infeasible resources. If so, outputs usability messages for users to
  98. act upon.
  99. """
  100. def __init__(self, for_train: bool = False):
  101. # The information tracked across the life time of Tune loop.
  102. self._no_running_trials_since = -1
  103. self._last_trial_num = -1
  104. self._for_train = for_train
  105. def on_no_available_trials(self, all_trials):
  106. """Tracks information across the life of Tune loop and makes guesses
  107. about if Tune loop is stuck due to infeasible resources.
  108. If so, outputs certain warning messages.
  109. The logic should be conservative, non-intrusive and informative.
  110. For example, rate limiting is applied so that the message is not
  111. spammy.
  112. """
  113. # This is approximately saying we are not making progress.
  114. if len(all_trials) == self._last_trial_num:
  115. if self._no_running_trials_since == -1:
  116. self._no_running_trials_since = time.monotonic()
  117. elif (
  118. time.monotonic() - self._no_running_trials_since
  119. > _get_insufficient_resources_warning_threshold()
  120. ):
  121. can_fulfill_any = any(
  122. trial.status == Trial.PENDING and _can_fulfill_no_autoscaler(trial)
  123. for trial in all_trials
  124. )
  125. if can_fulfill_any:
  126. # If one trial can be fulfilled, it will be fulfilled eventually
  127. self._no_running_trials_since = -1
  128. return
  129. # Otherwise, can fulfill none
  130. msg = _get_insufficient_resources_warning_msg(
  131. for_train=self._for_train, trial=all_trials[0]
  132. )
  133. logger.warning(msg)
  134. self._no_running_trials_since = time.monotonic()
  135. else:
  136. self._no_running_trials_since = -1
  137. self._last_trial_num = len(all_trials)