tuner.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. import logging
  2. import os
  3. from pathlib import Path
  4. from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Type, Union
  5. import pyarrow.fs
  6. import ray
  7. from ray.air._internal.usage import AirEntrypoint
  8. from ray.air.util.node import _force_on_current_node
  9. from ray.train._internal.storage import _exists_at_fs_path, get_fs_and_path
  10. from ray.tune import ResumeConfig, RunConfig
  11. from ray.tune.experimental.output import get_air_verbosity
  12. from ray.tune.impl.tuner_internal import _TUNER_PKL, TunerInternal
  13. from ray.tune.progress_reporter import (
  14. _prepare_progress_reporter_for_ray_client,
  15. _stream_client_output,
  16. )
  17. from ray.tune.result_grid import ResultGrid
  18. from ray.tune.trainable import Trainable
  19. from ray.tune.tune_config import TuneConfig
  20. from ray.util import PublicAPI
  21. logger = logging.getLogger(__name__)
  22. if TYPE_CHECKING:
  23. from ray.train.base_trainer import BaseTrainer
  24. ClientActorHandle = Any
  25. # try:
  26. # # Breaks lint right now.
  27. # from ray.util.client.common import ClientActorHandle
  28. # except Exception:
  29. # pass
  30. # The magic key that is used when instantiating Tuner during resume.
  31. _TUNER_INTERNAL = "_tuner_internal"
  32. _SELF = "self"
  33. @PublicAPI(stability="beta")
  34. class Tuner:
  35. """Tuner is the recommended way of launching hyperparameter tuning jobs with Ray Tune.
  36. Args:
  37. trainable: The trainable to be tuned.
  38. param_space: Search space of the tuning job. See :ref:`tune-search-space-tutorial`.
  39. tune_config: Tuning specific configs, such as setting custom
  40. :ref:`search algorithms <tune-search-alg>` and
  41. :ref:`trial scheduling algorithms <tune-schedulers>`.
  42. run_config: Job-level run configuration, which includes configs for
  43. persistent storage, checkpointing, fault tolerance, etc.
  44. Usage pattern:
  45. .. code-block:: python
  46. import ray.tune
  47. def trainable(config):
  48. # Your training logic here
  49. ray.tune.report({"accuracy": 0.8})
  50. tuner = Tuner(
  51. trainable=trainable,
  52. param_space={"lr": ray.tune.grid_search([0.001, 0.01])},
  53. run_config=ray.tune.RunConfig(name="my_tune_run"),
  54. )
  55. results = tuner.fit()
  56. To retry a failed Tune run, you can then do
  57. .. code-block:: python
  58. tuner = Tuner.restore(results.experiment_path, trainable=trainable)
  59. tuner.fit()
  60. ``results.experiment_path`` can be retrieved from the
  61. :ref:`ResultGrid object <tune-analysis-docs>`. It can
  62. also be easily seen in the log output from your first run.
  63. """
  64. # One of the following is assigned.
  65. _local_tuner: Optional[TunerInternal] # Only used in none ray client mode.
  66. _remote_tuner: Optional[ClientActorHandle] # Only used in ray client mode.
  67. def __init__(
  68. self,
  69. trainable: Optional[
  70. Union[str, Callable, Type[Trainable], "BaseTrainer"]
  71. ] = None,
  72. *,
  73. param_space: Optional[Dict[str, Any]] = None,
  74. tune_config: Optional[TuneConfig] = None,
  75. run_config: Optional[RunConfig] = None,
  76. # This is internal only arg.
  77. # Only for dogfooding purposes. We can slowly promote these args
  78. # to RunConfig or TuneConfig as needed.
  79. # TODO(xwjiang): Remove this later.
  80. _tuner_kwargs: Optional[Dict] = None,
  81. _tuner_internal: Optional[TunerInternal] = None,
  82. _entrypoint: AirEntrypoint = AirEntrypoint.TUNER,
  83. ):
  84. """Configure and construct a tune run."""
  85. kwargs = locals().copy()
  86. self._is_ray_client = ray.util.client.ray.is_connected()
  87. if self._is_ray_client:
  88. _run_config = run_config or RunConfig()
  89. if get_air_verbosity(_run_config.verbose) is not None:
  90. logger.info(
  91. "[output] This uses the legacy output and progress reporter, "
  92. "as Ray client is not supported by the new engine. "
  93. "For more information, see "
  94. "https://github.com/ray-project/ray/issues/36949"
  95. )
  96. if _tuner_internal:
  97. if not self._is_ray_client:
  98. self._local_tuner = kwargs[_TUNER_INTERNAL]
  99. else:
  100. self._remote_tuner = kwargs[_TUNER_INTERNAL]
  101. else:
  102. kwargs.pop(_TUNER_INTERNAL, None)
  103. kwargs.pop(_SELF, None)
  104. if not self._is_ray_client:
  105. self._local_tuner = TunerInternal(**kwargs)
  106. else:
  107. self._remote_tuner = _force_on_current_node(
  108. ray.remote(num_cpus=0)(TunerInternal)
  109. ).remote(**kwargs)
  110. @classmethod
  111. def restore(
  112. cls,
  113. path: str,
  114. trainable: Union[str, Callable, Type[Trainable], "BaseTrainer"],
  115. resume_unfinished: bool = True,
  116. resume_errored: bool = False,
  117. restart_errored: bool = False,
  118. param_space: Optional[Dict[str, Any]] = None,
  119. storage_filesystem: Optional[pyarrow.fs.FileSystem] = None,
  120. _resume_config: Optional[ResumeConfig] = None,
  121. ) -> "Tuner":
  122. """Restores Tuner after a previously failed run.
  123. All trials from the existing run will be added to the result table. The
  124. argument flags control how existing but unfinished or errored trials are
  125. resumed.
  126. Finished trials are always added to the overview table. They will not be
  127. resumed.
  128. Unfinished trials can be controlled with the ``resume_unfinished`` flag.
  129. If ``True`` (default), they will be continued. If ``False``, they will
  130. be added as terminated trials (even if they were only created and never
  131. trained).
  132. Errored trials can be controlled with the ``resume_errored`` and
  133. ``restart_errored`` flags. The former will resume errored trials from
  134. their latest checkpoints. The latter will restart errored trials from
  135. scratch and prevent loading their last checkpoints.
  136. .. note::
  137. Restoring an experiment from a path that's pointing to a *different*
  138. location than the original experiment path is supported.
  139. However, Ray Tune assumes that the full experiment directory is available
  140. (including checkpoints) so that it's possible to resume trials from their
  141. latest state.
  142. For example, if the original experiment path was run locally,
  143. then the results are uploaded to cloud storage, Ray Tune expects the full
  144. contents to be available in cloud storage if attempting to resume
  145. via ``Tuner.restore("s3://...")``. The restored run will continue
  146. writing results to the same cloud storage location.
  147. Args:
  148. path: The local or remote path of the experiment directory
  149. for an interrupted or failed run.
  150. Note that an experiment where all trials finished will not be resumed.
  151. This information could be easily located near the end of the
  152. console output of previous run.
  153. trainable: The trainable to use upon resuming the experiment.
  154. This should be the same trainable that was used to initialize
  155. the original Tuner.
  156. param_space: The same `param_space` that was passed to
  157. the original Tuner. This can be optionally re-specified due
  158. to the `param_space` potentially containing Ray object
  159. references (tuning over Datasets or tuning over
  160. several `ray.put` object references). **Tune expects the
  161. `param_space` to be unmodified**, and the only part that
  162. will be used during restore are the updated object references.
  163. Changing the hyperparameter search space then resuming is NOT
  164. supported by this API.
  165. resume_unfinished: If True, will continue to run unfinished trials.
  166. resume_errored: If True, will re-schedule errored trials and try to
  167. restore from their latest checkpoints.
  168. restart_errored: If True, will re-schedule errored trials but force
  169. restarting them from scratch (no checkpoint will be loaded).
  170. storage_filesystem: Custom ``pyarrow.fs.FileSystem``
  171. corresponding to the ``path``. This may be necessary if the original
  172. experiment passed in a custom filesystem.
  173. _resume_config: [Experimental] Config object that controls how to resume
  174. trials of different statuses. Can be used as a substitute to
  175. `resume_*` and `restart_*` flags above.
  176. """
  177. unfinished = (
  178. ResumeConfig.ResumeType.RESUME
  179. if resume_unfinished
  180. else ResumeConfig.ResumeType.SKIP
  181. )
  182. errored = ResumeConfig.ResumeType.SKIP
  183. if resume_errored:
  184. errored = ResumeConfig.ResumeType.RESUME
  185. elif restart_errored:
  186. errored = ResumeConfig.ResumeType.RESTART
  187. resume_config = _resume_config or ResumeConfig(
  188. unfinished=unfinished, errored=errored
  189. )
  190. if not ray.util.client.ray.is_connected():
  191. tuner_internal = TunerInternal(
  192. restore_path=path,
  193. resume_config=resume_config,
  194. trainable=trainable,
  195. param_space=param_space,
  196. storage_filesystem=storage_filesystem,
  197. )
  198. return Tuner(_tuner_internal=tuner_internal)
  199. else:
  200. tuner_internal = _force_on_current_node(
  201. ray.remote(num_cpus=0)(TunerInternal)
  202. ).remote(
  203. restore_path=path,
  204. resume_config=resume_config,
  205. trainable=trainable,
  206. param_space=param_space,
  207. storage_filesystem=storage_filesystem,
  208. )
  209. return Tuner(_tuner_internal=tuner_internal)
  210. @classmethod
  211. def can_restore(
  212. cls,
  213. path: Union[str, os.PathLike],
  214. storage_filesystem: Optional[pyarrow.fs.FileSystem] = None,
  215. ) -> bool:
  216. """Checks whether a given directory contains a restorable Tune experiment.
  217. Usage Pattern:
  218. Use this utility to switch between starting a new Tune experiment
  219. and restoring when possible. This is useful for experiment fault-tolerance
  220. when re-running a failed tuning script.
  221. .. code-block:: python
  222. import os
  223. from ray.tune import Tuner, RunConfig
  224. def train_fn(config):
  225. # Make sure to implement checkpointing so that progress gets
  226. # saved on restore.
  227. pass
  228. name = "exp_name"
  229. storage_path = os.path.expanduser("~/ray_results")
  230. exp_dir = os.path.join(storage_path, name)
  231. if Tuner.can_restore(exp_dir):
  232. tuner = Tuner.restore(
  233. exp_dir,
  234. trainable=train_fn,
  235. resume_errored=True,
  236. )
  237. else:
  238. tuner = Tuner(
  239. train_fn,
  240. run_config=RunConfig(name=name, storage_path=storage_path),
  241. )
  242. tuner.fit()
  243. Args:
  244. path: The path to the experiment directory of the Tune experiment.
  245. This can be either a local directory or a remote URI
  246. (e.g. s3://bucket/exp_name).
  247. Returns:
  248. bool: True if this path exists and contains the Tuner state to resume from
  249. """
  250. fs, fs_path = get_fs_and_path(path, storage_filesystem)
  251. return _exists_at_fs_path(fs, Path(fs_path, _TUNER_PKL).as_posix())
  252. def _prepare_remote_tuner_for_jupyter_progress_reporting(self):
  253. run_config: RunConfig = ray.get(self._remote_tuner.get_run_config.remote())
  254. progress_reporter, string_queue = _prepare_progress_reporter_for_ray_client(
  255. run_config.progress_reporter, run_config.verbose
  256. )
  257. run_config.progress_reporter = progress_reporter
  258. ray.get(
  259. self._remote_tuner.set_run_config_and_remote_string_queue.remote(
  260. run_config, string_queue
  261. )
  262. )
  263. return progress_reporter, string_queue
  264. def fit(self) -> ResultGrid:
  265. """Executes hyperparameter tuning job as configured and returns result.
  266. Failure handling:
  267. For the kind of exception that happens during the execution of a trial,
  268. one may inspect it together with stacktrace through the returned result grid.
  269. See ``ResultGrid`` for reference. Each trial may fail up to a certain number.
  270. This is configured by ``RunConfig.FailureConfig.max_failures``.
  271. Exception that happens beyond trials will be thrown by this method as well.
  272. In such cases, there will be instruction like the following printed out
  273. at the end of console output to inform users on how to resume.
  274. Please use `Tuner.restore` to resume.
  275. .. code-block:: python
  276. import os
  277. from ray.tune import Tuner
  278. trainable = ...
  279. tuner = Tuner.restore(
  280. os.path.expanduser("~/ray_results/tuner_resume"),
  281. trainable=trainable
  282. )
  283. tuner.fit()
  284. Raises:
  285. RayTaskError: If user-provided trainable raises an exception
  286. """
  287. if not self._is_ray_client:
  288. return self._local_tuner.fit()
  289. else:
  290. (
  291. progress_reporter,
  292. string_queue,
  293. ) = self._prepare_remote_tuner_for_jupyter_progress_reporting()
  294. fit_future = self._remote_tuner.fit.remote()
  295. _stream_client_output(
  296. fit_future,
  297. progress_reporter,
  298. string_queue,
  299. )
  300. return ray.get(fit_future)
  301. def get_results(self) -> ResultGrid:
  302. """Get results of a hyperparameter tuning run.
  303. This method returns the same results as :meth:`~ray.tune.Tuner.fit`
  304. and can be used to retrieve the results after restoring a tuner without
  305. calling ``fit()`` again.
  306. If the tuner has not been fit before, an error will be raised.
  307. .. code-block:: python
  308. from ray.tune import Tuner
  309. # `trainable` is what was passed in to the original `Tuner`
  310. tuner = Tuner.restore("/path/to/experiment', trainable=trainable)
  311. results = tuner.get_results()
  312. Returns:
  313. Result grid of a previously fitted tuning run.
  314. """
  315. if not self._is_ray_client:
  316. return self._local_tuner.get_results()
  317. else:
  318. (
  319. progress_reporter,
  320. string_queue,
  321. ) = self._prepare_remote_tuner_for_jupyter_progress_reporting()
  322. get_results_future = self._remote_tuner.get_results.remote()
  323. _stream_client_output(
  324. get_results_future,
  325. progress_reporter,
  326. string_queue,
  327. )
  328. return ray.get(get_results_future)
  329. def __getattribute__(self, item):
  330. if item == "restore":
  331. raise AttributeError(
  332. "`Tuner.restore()` is a classmethod and cannot be called on an "
  333. "instance. Use `tuner = Tuner.restore(...)` to instantiate the "
  334. "Tuner instead."
  335. )
  336. return super().__getattribute__(item)