| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- import logging
- import os
- import time
- from functools import lru_cache
- from typing import Dict, Optional, Tuple
- import ray
- from ray.tune.execution.cluster_info import _is_ray_cluster
- from ray.tune.experiment import Trial
- logger = logging.getLogger(__name__)
- # Ideally we want to use @cache; but it's only available for python 3.9.
- # Caching is only helpful/correct for no autoscaler case.
- @lru_cache()
- def _get_cluster_resources_no_autoscaler() -> Dict:
- return ray.cluster_resources()
- def _get_trial_cpu_and_gpu(trial: Trial) -> Tuple[int, int]:
- cpu = trial.placement_group_factory.required_resources.get("CPU", 0)
- gpu = trial.placement_group_factory.required_resources.get("GPU", 0)
- return cpu, gpu
- def _can_fulfill_no_autoscaler(trial: Trial) -> bool:
- """Calculates if there is enough resources for a PENDING trial.
- For no autoscaler case.
- """
- assert trial.status == Trial.PENDING
- asked_cpus, asked_gpus = _get_trial_cpu_and_gpu(trial)
- return asked_cpus <= _get_cluster_resources_no_autoscaler().get(
- "CPU", 0
- ) and asked_gpus <= _get_cluster_resources_no_autoscaler().get("GPU", 0)
- @lru_cache()
- def _get_insufficient_resources_warning_threshold() -> float:
- if _is_ray_cluster():
- return float(
- os.environ.get(
- "TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60"
- )
- )
- else:
- # Set the default to 10s so that we don't prematurely determine that
- # a cluster cannot fulfill the resources requirements.
- # TODO(xwjiang): Change it back once #18608 is resolved.
- return float(os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "60"))
- MSG_TRAIN_START = (
- "Training has not started in the last {wait_time:.0f} seconds. "
- "This could be due to the cluster not having enough resources available. "
- )
- MSG_TRAIN_INSUFFICIENT = (
- "You asked for {asked_cpus} CPUs and {asked_gpus} GPUs, but the cluster only "
- "has {cluster_cpus} CPUs and {cluster_gpus} GPUs available. "
- )
- MSG_TRAIN_END = (
- "Stop the training and adjust the required resources (e.g. via the "
- "`ScalingConfig` or `resources_per_trial`, or `num_workers` for rllib), "
- "or add more resources to your cluster."
- )
- MSG_TUNE_START = (
- "No trial is running and no new trial has been started within "
- "the last {wait_time:.0f} seconds. "
- "This could be due to the cluster not having enough resources available. "
- )
- MSG_TUNE_INSUFFICIENT = (
- "You asked for {asked_cpus} CPUs and {asked_gpus} GPUs per trial, "
- "but the cluster only has {cluster_cpus} CPUs and {cluster_gpus} GPUs available. "
- )
- MSG_TUNE_END = (
- "Stop the tuning and adjust the required resources (e.g. via the "
- "`ScalingConfig` or `resources_per_trial`, or `num_workers` for rllib), "
- "or add more resources to your cluster."
- )
- # TODO(xwjiang): Consider having a help page with more detailed instructions.
- @lru_cache()
- def _get_insufficient_resources_warning_msg(
- for_train: bool = False, trial: Optional[Trial] = None
- ) -> str:
- msg = "Ignore this message if the cluster is autoscaling. "
- if for_train:
- start = MSG_TRAIN_START
- insufficient = MSG_TRAIN_INSUFFICIENT
- end = MSG_TRAIN_END
- else:
- start = MSG_TUNE_START
- insufficient = MSG_TUNE_INSUFFICIENT
- end = MSG_TUNE_END
- msg += start.format(wait_time=_get_insufficient_resources_warning_threshold())
- if trial:
- asked_cpus, asked_gpus = _get_trial_cpu_and_gpu(trial)
- cluster_resources = _get_cluster_resources_no_autoscaler()
- msg += insufficient.format(
- asked_cpus=asked_cpus,
- asked_gpus=asked_gpus,
- cluster_cpus=cluster_resources.get("CPU", 0),
- cluster_gpus=cluster_resources.get("GPU", 0),
- )
- msg += end
- return msg
- class _InsufficientResourcesManager:
- """Insufficient resources manager.
- Makes best effort, conservative guesses about if Tune loop is stuck due to
- infeasible resources. If so, outputs usability messages for users to
- act upon.
- """
- def __init__(self, for_train: bool = False):
- # The information tracked across the life time of Tune loop.
- self._no_running_trials_since = -1
- self._last_trial_num = -1
- self._for_train = for_train
- def on_no_available_trials(self, all_trials):
- """Tracks information across the life of Tune loop and makes guesses
- about if Tune loop is stuck due to infeasible resources.
- If so, outputs certain warning messages.
- The logic should be conservative, non-intrusive and informative.
- For example, rate limiting is applied so that the message is not
- spammy.
- """
- # This is approximately saying we are not making progress.
- if len(all_trials) == self._last_trial_num:
- if self._no_running_trials_since == -1:
- self._no_running_trials_since = time.monotonic()
- elif (
- time.monotonic() - self._no_running_trials_since
- > _get_insufficient_resources_warning_threshold()
- ):
- can_fulfill_any = any(
- trial.status == Trial.PENDING and _can_fulfill_no_autoscaler(trial)
- for trial in all_trials
- )
- if can_fulfill_any:
- # If one trial can be fulfilled, it will be fulfilled eventually
- self._no_running_trials_since = -1
- return
- # Otherwise, can fulfill none
- msg = _get_insufficient_resources_warning_msg(
- for_train=self._for_train, trial=all_trials[0]
- )
- logger.warning(msg)
- self._no_running_trials_since = time.monotonic()
- else:
- self._no_running_trials_since = -1
- self._last_trial_num = len(all_trials)
|