resource_changing_scheduler.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871
  1. import logging
  2. import pickle
  3. import warnings
  4. from copy import deepcopy
  5. from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Tuple, Union
  6. import numpy as np
  7. from ray.air.execution.resources.request import _sum_bundles
  8. from ray.tune.execution.placement_groups import PlacementGroupFactory
  9. from ray.tune.experiment import Trial
  10. from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler
  11. from ray.util.annotations import PublicAPI
  12. if TYPE_CHECKING:
  13. from ray.tune.execution.tune_controller import TuneController
  14. logger = logging.getLogger(__name__)
  15. @PublicAPI(stability="beta")
  16. class DistributeResources:
  17. """This class creates a basic uniform resource allocation function.
  18. The function naively balances free resources (CPUs and GPUs) between
  19. trials, giving them all equal priority, ensuring that all resources
  20. are always being used. The free resources will be placed in new bundles.
  21. The function assumes that all bundles are equal (there is no "head"
  22. bundle).
  23. If for some reason a trial ends up with
  24. more resources than there are free ones, it will adjust downwards.
  25. It will also ensure that trial as at least as many resources as
  26. it started with (``base_trial_resource``).
  27. The function returns a new ``PlacementGroupFactory`` with updated
  28. resource requirements, or None. If the returned
  29. ``PlacementGroupFactory`` is equal by value to the one the
  30. trial has currently, the scheduler will skip the update process
  31. internally (same with None).
  32. If you wish to implement your own resource distribution logic,
  33. you can do so by extending this class, as it provides several
  34. generic methods. You can also implement a function instead.
  35. Args:
  36. add_bundles: If True, create new bundles from free resources.
  37. Otherwise, spread them among base_trial_resource bundles.
  38. increase_by: A dict with key-value
  39. pairs representing an atomic unit of resources (name-amount)
  40. the trial will be increased by. If not set, the trial will
  41. increase by 1 CPU/GPU.
  42. increase_by_times: If set to >=1 and ``increase_by`` is set,
  43. the trial will increase by maximum of
  44. ``increase_by_times * increase_by`` resources. If set to <1,
  45. no upper limit is set. Ignored if ``increase_by`` is not set.
  46. reserve_resources: A dict of
  47. resource_name-amount pairs representing the resources
  48. that will not be allocated to resized trials.
  49. """
  50. def __init__(
  51. self,
  52. add_bundles: bool = False,
  53. increase_by: Optional[Dict[str, float]] = None,
  54. increase_by_times: int = -1,
  55. reserve_resources: Optional[Dict[str, float]] = None,
  56. ):
  57. self.add_bundles = add_bundles
  58. self.increase_by = increase_by or {}
  59. self.increase_by_times = increase_by_times
  60. self.reserve_resources = reserve_resources or {}
  61. def _validate(
  62. self, base_trial_resource: PlacementGroupFactory, result: Dict[str, Any]
  63. ) -> bool:
  64. """Return False if we should keep the current resources outright."""
  65. if not isinstance(base_trial_resource, PlacementGroupFactory):
  66. raise ValueError(
  67. f"{self.__class__.__name__} only supports PlacementGroupFactories."
  68. )
  69. if not self.add_bundles and len(base_trial_resource.bundles) > 1:
  70. raise ValueError(
  71. "If `add_bundles` is False, the number of bundles in "
  72. "`resources_per_trial` must be 1 "
  73. f"(got {len(base_trial_resource.bundles)})."
  74. )
  75. # Don't bother if this is just the first iteration
  76. if result["training_iteration"] < 1:
  77. return False
  78. return True
  79. def _get_total_available_resources(
  80. self, tune_controller: "TuneController"
  81. ) -> Tuple[float, float]:
  82. """Get the number of CPUs and GPUs avaialble in total (not just free)"""
  83. total_available_cpus = (
  84. tune_controller._resource_updater.get_num_cpus()
  85. - self.reserve_resources.get("CPU", 0)
  86. )
  87. total_available_gpus = (
  88. tune_controller._resource_updater.get_num_gpus()
  89. - self.reserve_resources.get("GPU", 0)
  90. )
  91. return total_available_cpus, total_available_gpus
  92. def _get_used_cpus_and_gpus(self, t: Trial) -> Tuple[float, float]:
  93. """Check how many CPUs and GPUs a trial is using currently"""
  94. return (
  95. t.placement_group_factory.required_resources.get("CPU", 0),
  96. t.placement_group_factory.required_resources.get("GPU", 0),
  97. )
  98. def _get_resources_from_bundles(
  99. self, bundles: List[Dict[str, float]]
  100. ) -> Dict[str, float]:
  101. """Get total sums of resources in bundles"""
  102. if not bundles:
  103. return {"CPU": 0, "GPU": 0}
  104. return _sum_bundles(bundles)
  105. def _is_bundle_empty(self, bundle: Dict[str, float]) -> bool:
  106. return not (bundle.get("CPU", 0) or bundle.get("GPU", 0))
  107. def _add_two_bundles(
  108. self,
  109. bundles_a: List[Dict[str, float]],
  110. bundles_b: List[Dict[str, float]],
  111. increase_by: Dict[str, float],
  112. limit_to_increase_by_times: bool,
  113. max_increase_by_times: int = -1,
  114. ):
  115. """Add two bundles together.
  116. If ``limit_to_increase_by_times`` is True, ``self.increase_by_times`` > 0
  117. and ``max_increase_by_times`` > 0, ensure that the resulting number of
  118. bundles is not above ``min(max_increase_by_times, self.increase_by_times)``.
  119. If ``limit_to_increase_by_times`` is True and ``self.increase_by_times`` > 0,
  120. ensure that the resulting number of bundles is not above
  121. `self.increase_by_times``.
  122. """
  123. if limit_to_increase_by_times:
  124. if max_increase_by_times > 0 and self.increase_by_times > 0:
  125. max_increase_by_times = min(
  126. max_increase_by_times, self.increase_by_times
  127. )
  128. elif self.increase_by_times > 0:
  129. max_increase_by_times = self.increase_by_times
  130. if self.add_bundles:
  131. bundles = [b for b in bundles_a if not self._is_bundle_empty(b)] + [
  132. b for b in bundles_b if not self._is_bundle_empty(b)
  133. ]
  134. if max_increase_by_times > 0:
  135. bundles = bundles[:max_increase_by_times]
  136. else:
  137. bundles_a = bundles_a or [{}]
  138. bundles_b = bundles_b or [{}]
  139. bundles = [
  140. {
  141. "CPU": bundles_a[0].get("CPU", 0) + bundles_b[0].get("CPU", 0),
  142. "GPU": bundles_a[0].get("GPU", 0) + bundles_b[0].get("GPU", 0),
  143. }
  144. ]
  145. if max_increase_by_times > 0:
  146. bundles[0]["CPU"] = min(
  147. bundles[0]["CPU"],
  148. increase_by.get("CPU", 0) * max_increase_by_times,
  149. )
  150. bundles[0]["GPU"] = min(
  151. bundles[0]["GPU"],
  152. increase_by.get("GPU", 0) * max_increase_by_times,
  153. )
  154. return bundles
  155. def _get_multiplier(
  156. self,
  157. increase_by: Dict[str, float],
  158. cpus: float = 0,
  159. gpus: float = 0,
  160. max_multiplier: int = -1,
  161. ) -> int:
  162. """Get how many times ``increase_by`` bundles
  163. occur in ``cpus`` and ``gpus``."""
  164. if increase_by.get("CPU", 0) and increase_by.get("GPU", 0):
  165. multiplier = min(
  166. cpus // increase_by.get("CPU", 0),
  167. gpus // increase_by.get("GPU", 0),
  168. )
  169. elif increase_by.get("GPU", 0):
  170. multiplier = gpus // increase_by.get("GPU", 0)
  171. else:
  172. multiplier = cpus // increase_by.get("CPU", 0)
  173. if max_multiplier > 0 and multiplier > 0:
  174. multiplier = min(max_multiplier, multiplier)
  175. return int(multiplier)
  176. def _remove_bundles(
  177. self,
  178. bundles: List[Dict[str, float]],
  179. increase_by: Dict[str, float],
  180. multiplier: int,
  181. ) -> List[Dict[str, float]]:
  182. """Remove ``multiplier`` ``increase_by`` bundles from ``bundles``."""
  183. multiplier = -abs(multiplier)
  184. if self.add_bundles:
  185. bundles = bundles[:multiplier]
  186. else:
  187. bundles = deepcopy(bundles)
  188. bundles[0]["CPU"] += increase_by.get("CPU", 0) * multiplier
  189. bundles[0]["GPU"] += increase_by.get("GPU", 0) * multiplier
  190. bundles[0]["CPU"] = max(bundles[0]["CPU"], 0)
  191. bundles[0]["GPU"] = max(bundles[0]["GPU"], 0)
  192. return bundles
  193. def _create_new_bundles(
  194. self,
  195. increase_by: Dict[str, float],
  196. multiplier: int,
  197. ) -> List[Dict[str, float]]:
  198. """Create a list of new bundles containing ``increase_by`` * ``multiplier``."""
  199. multiplier = abs(multiplier)
  200. if self.add_bundles:
  201. bundles = [increase_by] * int(multiplier)
  202. else:
  203. bundles = [{}]
  204. bundles[0]["CPU"] = increase_by.get("CPU", 0) * multiplier
  205. bundles[0]["GPU"] = increase_by.get("GPU", 0) * multiplier
  206. return bundles
  207. def _modify_bundles_with_free_resources(
  208. self,
  209. bundles: List[Dict[str, float]],
  210. increase_by: Dict[str, float],
  211. free_cpus: float,
  212. free_gpus: float,
  213. *,
  214. max_multiplier: int = -1,
  215. max_increase_by_times: int = -1,
  216. ):
  217. """Given free resources, increase/decrease the number of bundles in
  218. ``bundles``."""
  219. multiplier = self._get_multiplier(
  220. increase_by, free_cpus, free_gpus, max_multiplier
  221. )
  222. if multiplier < 0:
  223. bundles = self._remove_bundles(bundles, increase_by, multiplier)
  224. elif multiplier > 0:
  225. bundles_to_add = self._create_new_bundles(increase_by, multiplier)
  226. bundles = self._add_two_bundles(
  227. bundles, bundles_to_add, increase_by, True, max_increase_by_times
  228. )
  229. return bundles
  230. def _get_added_bundles(
  231. self, bundles: List[Dict[str, float]], base_bundles: List[Dict[str, float]]
  232. ) -> List[Dict[str, float]]:
  233. """Return the difference between bundles and base_bundles"""
  234. if self.add_bundles:
  235. added_bundles = bundles[len(base_bundles) :]
  236. else:
  237. if not bundles:
  238. bundles = [{"CPU": 0, "GPU": 0}]
  239. if not base_bundles:
  240. base_bundles = [{"CPU": 0, "GPU": 0}]
  241. added_bundles = [
  242. {
  243. "CPU": bundles[0].get("CPU", 0) - base_bundles[0].get("CPU", 0),
  244. "GPU": bundles[0].get("GPU", 0) - base_bundles[0].get("GPU", 0),
  245. }
  246. ]
  247. return added_bundles
  248. def _are_bundles_below_limit(
  249. self,
  250. bundles: List[Dict[str, float]],
  251. base_bundles: Optional[List[Dict[str, float]]] = None,
  252. max_added_cpus: Optional[float] = None,
  253. max_added_gpus: Optional[float] = None,
  254. ):
  255. if not max_added_cpus:
  256. if self.increase_by_times > 0:
  257. max_added_cpus = self.increase_by.get("CPU", 0) * self.increase_by_times
  258. else:
  259. max_added_cpus = np.inf
  260. if not max_added_gpus:
  261. if self.increase_by_times > 0:
  262. max_added_gpus = self.increase_by.get("GPU", 0) * self.increase_by_times
  263. else:
  264. max_added_gpus = np.inf
  265. added_resources = self._get_resources_from_bundles(
  266. self._get_added_bundles(bundles, base_bundles) if base_bundles else bundles
  267. )
  268. ret = (
  269. added_resources.get("CPU", -np.inf) < max_added_cpus
  270. or added_resources.get("GPU", -np.inf) < max_added_gpus
  271. )
  272. return ret
  273. def _get_new_added_bundles(
  274. self,
  275. trial: Trial,
  276. all_trials: List[Trial],
  277. base_bundles: List[Dict[str, float]],
  278. increase_by: Dict[str, float],
  279. total_available_cpus: float,
  280. total_available_gpus: float,
  281. used_cpus: float,
  282. used_gpus: float,
  283. ) -> List[Dict[str, float]]:
  284. """Returns updated added bundles."""
  285. upper_limit_all_trials_bundles = [list() for _ in range(len(all_trials))]
  286. free_cpus = total_available_cpus - used_cpus
  287. free_gpus = total_available_gpus - used_gpus
  288. base_resources = self._get_resources_from_bundles(base_bundles)
  289. upper_limit_cpus_to_distribute = total_available_cpus - (
  290. base_resources.get("CPU", 0) * len(all_trials)
  291. )
  292. upper_limit_gpus_to_distribute = total_available_gpus - (
  293. base_resources.get("GPU", 0) * len(all_trials)
  294. )
  295. max_increase_by_times = 0
  296. # First, calculate upper limits for uniform allocation
  297. # This is done by simulating a clean slate scenario
  298. # The loop runs until all resources are allocated or
  299. # all trials are at their resource limits
  300. i = 0
  301. trials_at_limit = set()
  302. while (
  303. len(trials_at_limit) < len(all_trials)
  304. # we have previously asserted that at least one resource has to be
  305. # bigger than 0
  306. and upper_limit_cpus_to_distribute >= increase_by.get("CPU", 0)
  307. and upper_limit_gpus_to_distribute >= increase_by.get("GPU", 0)
  308. ):
  309. idx = i % len(upper_limit_all_trials_bundles)
  310. old_bundles = deepcopy(upper_limit_all_trials_bundles[idx])
  311. upper_limit_all_trials_bundles[
  312. idx
  313. ] = self._modify_bundles_with_free_resources(
  314. upper_limit_all_trials_bundles[idx],
  315. increase_by,
  316. upper_limit_cpus_to_distribute,
  317. upper_limit_gpus_to_distribute,
  318. max_multiplier=1,
  319. )
  320. added_resources = self._get_resources_from_bundles(
  321. self._get_added_bundles(
  322. upper_limit_all_trials_bundles[idx], old_bundles
  323. )
  324. )
  325. if not added_resources.get("CPU", 0) and not added_resources.get("GPU", 0):
  326. trials_at_limit.add(idx)
  327. elif idx == 0:
  328. max_increase_by_times += 1
  329. upper_limit_cpus_to_distribute -= added_resources.get("CPU", 0)
  330. upper_limit_gpus_to_distribute -= added_resources.get("GPU", 0)
  331. i += 1
  332. # Add new resourcs, but only up to calculated upper limits
  333. # (max_increase_by_times)
  334. return self._modify_bundles_with_free_resources(
  335. self._get_added_bundles(
  336. trial.placement_group_factory.bundles, base_bundles
  337. ),
  338. increase_by,
  339. free_cpus,
  340. free_gpus,
  341. max_increase_by_times=max_increase_by_times,
  342. )
  343. def __call__(
  344. self,
  345. tune_controller: "TuneController",
  346. trial: Trial,
  347. result: Dict[str, Any],
  348. scheduler: "ResourceChangingScheduler",
  349. ) -> Optional[PlacementGroupFactory]:
  350. """Run resource allocation logic.
  351. Returns a new ``PlacementGroupFactory`` with updated
  352. resource requirements, or None. If the returned
  353. ``PlacementGroupFactory`` is equal by value to the one the
  354. trial has currently, the scheduler will skip the update process
  355. internally (same with None).
  356. Args:
  357. tune_controller: Trial runner for this Tune run.
  358. Can be used to obtain information about other trials.
  359. trial: The trial to allocate new resources to.
  360. result: The latest results of trial.
  361. scheduler: The scheduler calling
  362. the function.
  363. """
  364. # Get base trial resources as defined in
  365. # ``tune.run(resources_per_trial)``
  366. base_trial_resource = scheduler.base_trial_resources
  367. if not self._validate(base_trial_resource=base_trial_resource, result=result):
  368. return None
  369. # default values if resources_per_trial is unspecified
  370. if base_trial_resource is None:
  371. base_trial_resource = PlacementGroupFactory([{"CPU": 1, "GPU": 0}])
  372. if self.increase_by:
  373. increase_by = self.increase_by
  374. assert not self._is_bundle_empty(increase_by)
  375. assert increase_by.get("CPU", 0) >= 0 and increase_by.get("GPU", 0) >= 0
  376. elif self.add_bundles:
  377. increase_by = base_trial_resource.bundles[-1]
  378. elif base_trial_resource.bundles[0].get("GPU", 0):
  379. increase_by = {"GPU": 1}
  380. else:
  381. increase_by = {"CPU": 1}
  382. base_bundles = deepcopy(base_trial_resource.bundles)
  383. (
  384. total_available_cpus,
  385. total_available_gpus,
  386. ) = self._get_total_available_resources(tune_controller=tune_controller)
  387. all_trials = tune_controller.get_live_trials()
  388. used_cpus_and_gpus = [self._get_used_cpus_and_gpus(t) for t in all_trials]
  389. used_cpus, used_gpus = zip(*used_cpus_and_gpus)
  390. used_cpus = sum(used_cpus)
  391. used_gpus = sum(used_gpus)
  392. added_bundles = self._get_new_added_bundles(
  393. trial,
  394. all_trials,
  395. base_bundles,
  396. increase_by,
  397. total_available_cpus,
  398. total_available_gpus,
  399. used_cpus,
  400. used_gpus,
  401. )
  402. new_bundles = self._add_two_bundles(
  403. base_bundles, added_bundles, increase_by, False
  404. )
  405. pgf = PlacementGroupFactory(
  406. new_bundles,
  407. strategy=base_trial_resource.strategy,
  408. *base_trial_resource._args,
  409. **base_trial_resource._kwargs,
  410. )
  411. pgf._head_bundle_is_empty = base_trial_resource._head_bundle_is_empty
  412. return pgf
  413. @PublicAPI(stability="beta")
  414. class DistributeResourcesToTopJob(DistributeResources):
  415. """This class creates a "TopJob" resource allocation function.
  416. The function will assign all of the free resources to the best
  417. performing trial (as defined by ``metric`` and ``mode``). The
  418. previous best trials will not have their resources deallocated,
  419. unless in the case outlined below.
  420. If for some reason a trial ends up with
  421. more resources than there are free ones, it will adjust downwards.
  422. It will also ensure that trial as at least as many resources as
  423. it started with (``base_trial_resource``).
  424. The function returns a new ``PlacementGroupFactory`` with updated
  425. resource requirements, or None. If the returned
  426. ``PlacementGroupFactory`` is equal by value to the one the
  427. trial has currently, the scheduler will skip the update process
  428. internally (same with None).
  429. Args:
  430. add_bundles: If True, create new bundles from free resources.
  431. Otherwise, spread them among base_trial_resource bundles.
  432. increase_by: A dict with key-value
  433. pairs representing an atomic unit of resources (name-amount)
  434. the trial will be increased by. If not set, the trial will
  435. increase by 1 CPU/GPU.
  436. increase_by_times: If set to >=1 and ``increase_by`` is set,
  437. the trial will increase by maximum of
  438. ``increase_by_times * increase_by`` resources. If set to <1,
  439. no upper limit is set. Ignored if ``increase_by`` is not set.
  440. reserve_resources: A dict of
  441. resource_name-amount pairs representing the resources
  442. that will not be allocated to resized trials.
  443. is that the attribute should increase monotonically.
  444. metric: The training result objective value attribute. Stopping
  445. procedures will use this attribute. If None, will use the metric
  446. of the scheduler.
  447. mode: One of {min, max}. Determines whether objective is
  448. minimizing or maximizing the metric attribute. If None, will use the metric
  449. of the scheduler.
  450. """
  451. def __init__(
  452. self,
  453. add_bundles: bool = False,
  454. increase_by: Optional[Dict[str, float]] = None,
  455. increase_by_times: int = -1,
  456. reserve_resources: Optional[Dict[str, float]] = None,
  457. metric: Optional[str] = None,
  458. mode: Optional[str] = None,
  459. ):
  460. super().__init__(add_bundles, increase_by, increase_by_times, reserve_resources)
  461. self.metric = metric
  462. self.mode = mode
  463. @property
  464. def _metric_op(self) -> float:
  465. if self.mode not in ("min", "max"):
  466. raise ValueError("The mode parameter can only be either min or max.")
  467. if self.mode == "max":
  468. return 1.0
  469. return -1.0
  470. def _get_new_added_bundles(
  471. self,
  472. trial: Trial,
  473. all_trials: List[Trial],
  474. base_bundles: List[Dict[str, float]],
  475. increase_by: Dict[str, float],
  476. total_available_cpus: float,
  477. total_available_gpus: float,
  478. used_cpus: float,
  479. used_gpus: float,
  480. ) -> List[Dict[str, float]]:
  481. if self.metric is None:
  482. raise ValueError(
  483. "The metric parameter cannot be None. The parameter can be set in "
  484. "either `DistributeResourcesToTopJob`, the base scheduler or in "
  485. "`tune.TuneConfig()` (highest to lowest priority)."
  486. )
  487. free_cpus = total_available_cpus - used_cpus
  488. free_gpus = total_available_gpus - used_gpus
  489. sorted_trials = sorted(
  490. all_trials,
  491. key=lambda t: -self._metric_op * t.last_result.get(self.metric, np.inf),
  492. )
  493. added_bundles = self._get_added_bundles(
  494. trial.placement_group_factory.bundles, base_bundles
  495. )
  496. best_trial = next(
  497. (
  498. t
  499. for t in sorted_trials
  500. if self._are_bundles_below_limit(
  501. t.placement_group_factory.bundles, base_bundles
  502. )
  503. ),
  504. sorted_trials[0],
  505. )
  506. if (
  507. trial.trial_id != best_trial.trial_id
  508. # Only reduce resources here
  509. and self._get_multiplier(increase_by, free_cpus, free_gpus) >= 0
  510. ):
  511. return added_bundles
  512. return self._modify_bundles_with_free_resources(
  513. added_bundles,
  514. increase_by,
  515. free_cpus,
  516. free_gpus,
  517. )
  518. _DistributeResourcesDefault = DistributeResources(add_bundles=False)
  519. _DistributeResourcesDistributedDefault = DistributeResources(add_bundles=True)
  520. @PublicAPI(stability="beta")
  521. class ResourceChangingScheduler(TrialScheduler):
  522. """A utility scheduler to dynamically change resources of live trials.
  523. .. versionadded:: 1.5.0
  524. .. note::
  525. Experimental. API may change in future releases.
  526. The ResourceChangingScheduler works by wrapping around any other
  527. scheduler and adjusting the resource requirements of live trials
  528. in response to the decisions of the wrapped scheduler
  529. through a user-specified ``resources_allocation_function``.
  530. An example of such a function can be found in
  531. :doc:`/tune/examples/includes/xgboost_dynamic_resources_example`.
  532. If the functional API is used, the current trial resources can be obtained
  533. by calling `tune.get_trial_resources()` inside the training function.
  534. The function should be able to
  535. :ref:`load and save checkpoints <tune-function-trainable-checkpointing>`
  536. (the latter preferably every iteration).
  537. If the Trainable (class) API is used, you can obtain the current trial
  538. resources through the ``Trainable.trial_resources`` property.
  539. Cannot be used if ``reuse_actors`` is True in ``tune.TuneConfig()``. A ValueError
  540. will be raised in that case.
  541. Args:
  542. base_scheduler: The scheduler to provide decisions
  543. about trials. If None, a default FIFOScheduler will be used.
  544. resources_allocation_function: The callable used to change
  545. live trial resource requiements during tuning. This callable
  546. will be called on each trial as it finishes one step of training.
  547. The callable must take four arguments: ``TrialRunner``, current
  548. ``Trial``, current result :class:`dict` and the
  549. ``ResourceChangingScheduler`` calling it. The callable must
  550. return a ``PlacementGroupFactory``
  551. or None (signifying no need for an update). If
  552. ``resources_allocation_function`` is None, no resource
  553. requirements will be changed at any time.
  554. By default, :class:`DistributeResources` will be used,
  555. distributing available CPUs and GPUs over all running trials
  556. in a robust way, without any prioritization.
  557. Warning:
  558. If the ``resources_allocation_function`` sets trial resource
  559. requirements to values bigger than possible, the trial will
  560. not run. Ensure that your callable accounts for that possibility
  561. by setting upper limits. Consult :class:`DistributeResources`
  562. to see how that may be done.
  563. Example:
  564. .. code-block:: python
  565. base_scheduler = ASHAScheduler(max_t=16)
  566. def my_resources_allocation_function(
  567. tune_controller: "TuneController",
  568. trial: Trial,
  569. result: Dict[str, Any],
  570. scheduler: "ResourceChangingScheduler"
  571. ) -> Optional[Union[PlacementGroupFactory, Resource]]:
  572. # logic here
  573. # usage of PlacementGroupFactory is strongly preferred
  574. return PlacementGroupFactory(...)
  575. scheduler = ResourceChangingScheduler(
  576. base_scheduler,
  577. my_resources_allocation_function
  578. )
  579. See :doc:`/tune/examples/includes/xgboost_dynamic_resources_example` for a
  580. more detailed example.
  581. """
  582. def __init__(
  583. self,
  584. base_scheduler: Optional[TrialScheduler] = None,
  585. resources_allocation_function: Optional[
  586. Callable[
  587. [
  588. "TuneController",
  589. Trial,
  590. Dict[str, Any],
  591. "ResourceChangingScheduler",
  592. ],
  593. Optional[PlacementGroupFactory],
  594. ]
  595. ] = _DistributeResourcesDefault,
  596. ) -> None:
  597. super().__init__()
  598. if resources_allocation_function is None:
  599. warnings.warn(
  600. "`resources_allocation_function` is None. No resource "
  601. "requirements will be changed at any time. Pass a "
  602. "correctly defined function to enable functionality."
  603. )
  604. self._resources_allocation_function = resources_allocation_function
  605. self._base_scheduler = base_scheduler or FIFOScheduler()
  606. self._base_trial_resources: Optional[PlacementGroupFactory] = None
  607. self._trials_to_reallocate: Dict[
  608. Trial, Optional[Union[dict, PlacementGroupFactory]]
  609. ] = {}
  610. self._reallocated_trial_ids: Set[str] = set()
  611. self._metric = None
  612. self._mode = None
  613. @property
  614. def metric(self):
  615. return self._base_scheduler._metric
  616. @property
  617. def base_trial_resources(self) -> Optional[PlacementGroupFactory]:
  618. return self._base_trial_resources
  619. def set_search_properties(
  620. self, metric: Optional[str], mode: Optional[str], **spec
  621. ) -> bool:
  622. self._metric = metric
  623. self._mode = mode
  624. return self._base_scheduler.set_search_properties(metric, mode, **spec)
  625. def on_trial_add(self, tune_controller: "TuneController", trial: Trial, **kwargs):
  626. # use the first trial resources as the base
  627. if self._base_trial_resources is None:
  628. self._base_trial_resources = trial.placement_group_factory
  629. # Raise error if the resources of a newly added trial don't match
  630. # base resources, but allow trials that have already had their
  631. # resources changed by ResourceChangingScheduler
  632. # (those can be added again during loading from a checkpoint)
  633. elif trial.trial_id not in self._reallocated_trial_ids:
  634. trial_resources = trial.placement_group_factory
  635. if trial_resources != self._base_trial_resources:
  636. raise RuntimeError(
  637. "ResourceChangingScheduler doesn't support trials with "
  638. "varying base resources. First trial had "
  639. f"{self._base_trial_resources}, trial {trial} has "
  640. f"{trial_resources}."
  641. )
  642. return self._base_scheduler.on_trial_add(tune_controller, trial, **kwargs)
  643. def on_trial_error(self, tune_controller: "TuneController", trial: Trial, **kwargs):
  644. return self._base_scheduler.on_trial_error(tune_controller, trial, **kwargs)
  645. def on_trial_result(
  646. self, tune_controller: "TuneController", trial: Trial, result: Dict
  647. ) -> str:
  648. base_scheduler_decision = self._base_scheduler.on_trial_result(
  649. tune_controller, trial, result
  650. )
  651. if base_scheduler_decision == TrialScheduler.CONTINUE:
  652. new_resources = self.reallocate_trial_resources_if_needed(
  653. tune_controller, trial, result
  654. )
  655. if new_resources:
  656. self._trials_to_reallocate[trial] = new_resources
  657. return TrialScheduler.PAUSE
  658. return base_scheduler_decision
  659. def on_trial_complete(
  660. self,
  661. tune_controller: "TuneController",
  662. trial: Trial,
  663. result: Dict,
  664. **kwargs,
  665. ):
  666. return self._base_scheduler.on_trial_complete(
  667. tune_controller, trial, result, **kwargs
  668. )
  669. def on_trial_remove(
  670. self, tune_controller: "TuneController", trial: Trial, **kwargs
  671. ):
  672. return self._base_scheduler.on_trial_remove(tune_controller, trial, **kwargs)
  673. def choose_trial_to_run(
  674. self, tune_controller: "TuneController", **kwargs
  675. ) -> Optional[Trial]:
  676. if getattr(tune_controller, "_reuse_actors", False):
  677. raise ValueError(
  678. "ResourceChangingScheduler cannot be used with "
  679. "`reuse_actors=True`. FIX THIS by setting "
  680. "`reuse_actors=False` in `tune.TuneConfig()`."
  681. )
  682. any_resources_changed = False
  683. new_trials_to_reallocate = {}
  684. for trial, new_resources in self._trials_to_reallocate.items():
  685. if trial.status == Trial.RUNNING:
  686. new_trials_to_reallocate[trial] = new_resources
  687. logger.debug(f"{trial} is still running, skipping for now")
  688. continue
  689. any_resources_changed = any_resources_changed or self.set_trial_resources(
  690. trial, new_resources
  691. )
  692. self._trials_to_reallocate = new_trials_to_reallocate
  693. trial = self._base_scheduler.choose_trial_to_run(tune_controller, **kwargs)
  694. return trial
  695. def debug_string(self) -> str:
  696. return "(ResourceChangingScheduler) " f"{self._base_scheduler.debug_string()}"
  697. def save(self, checkpoint_path: str):
  698. save_object = self.__dict__
  699. with open(checkpoint_path, "wb") as outputFile:
  700. pickle.dump(save_object, outputFile)
  701. def restore(self, checkpoint_path: str):
  702. with open(checkpoint_path, "rb") as inputFile:
  703. save_object = pickle.load(inputFile)
  704. self.__dict__.update(save_object)
  705. def set_trial_resources(
  706. self, trial: Trial, new_resources: Union[Dict, PlacementGroupFactory]
  707. ) -> bool:
  708. """Returns True if new_resources were set."""
  709. if new_resources:
  710. logger.info(
  711. f"Setting trial {trial} resource to {new_resources} "
  712. f"with {new_resources._bundles}"
  713. )
  714. trial.placement_group_factory = None
  715. trial.update_resources(new_resources)
  716. # keep track of all trials which had their resources changed
  717. self._reallocated_trial_ids.add(trial.trial_id)
  718. return True
  719. return False
  720. def _are_resources_the_same(
  721. self,
  722. trial: Trial,
  723. new_resources,
  724. ) -> bool:
  725. """Returns True if trial's resources are value equal to new_resources.
  726. Only checks for PlacementGroupFactories at this moment.
  727. """
  728. if (
  729. isinstance(new_resources, PlacementGroupFactory)
  730. and trial.placement_group_factory == new_resources
  731. ):
  732. logger.debug(
  733. f"{trial} PGF "
  734. f"{trial.placement_group_factory.required_resources}"
  735. f" and {new_resources.required_resources}"
  736. f" are the same, skipping"
  737. )
  738. return True
  739. else:
  740. return False
  741. def reallocate_trial_resources_if_needed(
  742. self, tune_controller: "TuneController", trial: Trial, result: Dict
  743. ) -> Optional[Union[dict, PlacementGroupFactory]]:
  744. """Calls user defined resources_allocation_function. If the returned
  745. resources are not none and not the same as currently present, returns
  746. them. Otherwise, returns None."""
  747. if self._resources_allocation_function is None:
  748. return None
  749. if not getattr(self._resources_allocation_function, "metric", None):
  750. self._resources_allocation_function.metric = getattr(
  751. self._base_scheduler, "_metric", self._metric
  752. )
  753. if not getattr(self._resources_allocation_function, "mode", None):
  754. self._resources_allocation_function.mode = getattr(
  755. self._base_scheduler, "_mode", self._mode
  756. )
  757. new_resources = self._resources_allocation_function(
  758. tune_controller, trial, result, self
  759. )
  760. # if we can check if the new resources are the same,
  761. # we do that here and skip resource allocation
  762. if new_resources and not self._are_resources_the_same(trial, new_resources):
  763. return new_resources
  764. return None