runner_group.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751
  1. import abc
  2. import logging
  3. from typing import (
  4. TYPE_CHECKING,
  5. Any,
  6. Callable,
  7. Dict,
  8. List,
  9. Optional,
  10. Tuple,
  11. TypeVar,
  12. Union,
  13. )
  14. import ray
  15. from ray.actor import ActorHandle
  16. from ray.exceptions import RayActorError
  17. from ray.rllib.core import (
  18. COMPONENT_ENV_TO_MODULE_CONNECTOR,
  19. COMPONENT_LEARNER,
  20. COMPONENT_MODULE_TO_ENV_CONNECTOR,
  21. COMPONENT_RL_MODULE,
  22. )
  23. from ray.rllib.core.learner.learner_group import LearnerGroup
  24. from ray.rllib.utils.actor_manager import FaultTolerantActorManager
  25. from ray.rllib.utils.metrics import NUM_ENV_STEPS_SAMPLED_LIFETIME, WEIGHTS_SEQ_NO
  26. from ray.rllib.utils.runners.runner import Runner
  27. from ray.rllib.utils.typing import PolicyID
  28. from ray.util.annotations import DeveloperAPI
  29. if TYPE_CHECKING:
  30. from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
  31. logger = logging.getLogger(__name__)
  32. # Generic type var for `foreach_*` methods.
  33. T = TypeVar("T")
  34. @DeveloperAPI
  35. class RunnerGroup(metaclass=abc.ABCMeta):
  36. def __init__(
  37. self,
  38. config: "AlgorithmConfig",
  39. # TODO (simon): Check, if this is needed. Derived classes could define
  40. # this if needed.
  41. # default_policy_class: Optional[Type[Policy]]
  42. local_runner: Optional[bool] = False,
  43. logdir: Optional[str] = None,
  44. # TODO (simon): Check, if still needed.
  45. tune_trial_id: Optional[str] = None,
  46. pg_offset: int = 0,
  47. _setup: bool = True,
  48. **kwargs: Dict[str, Any],
  49. ) -> None:
  50. # TODO (simon): Remove when old stack is deprecated.
  51. self.config: AlgorithmConfig = (
  52. AlgorithmConfig.from_dict(config)
  53. if isinstance(config, dict)
  54. else (config or AlgorithmConfig())
  55. )
  56. self._remote_config = config
  57. self._remote_config_obj_ref = ray.put(self._remote_config)
  58. self._tune_trial_id = tune_trial_id
  59. self._pg_offset = pg_offset
  60. self._logdir = logdir
  61. self._worker_manager = FaultTolerantActorManager(
  62. max_remote_requests_in_flight_per_actor=self._max_requests_in_flight_per_runner,
  63. init_id=1,
  64. )
  65. if _setup:
  66. try:
  67. self._setup(
  68. config=config,
  69. num_runners=self.num_runners,
  70. local_runner=local_runner,
  71. **kwargs,
  72. )
  73. # `RunnerGroup` creation possibly fails, if some (remote) workers cannot
  74. # be initialized properly (due to some errors in the `Runners`'s
  75. # constructor).
  76. except RayActorError as e:
  77. # In case of an actor (remote worker) init failure, the remote worker
  78. # may still exist and will be accessible, however, e.g. calling
  79. # its `run.remote()` would result in strange "property not found"
  80. # errors.
  81. if e.actor_init_failed:
  82. # Raise the original error here that the `Runners` raised
  83. # during its construction process. This is to enforce transparency
  84. # for the user (better to understand the real reason behind the
  85. # failure).
  86. # - e.args[0]: The `RayTaskError` (inside the caught `RayActorError`).
  87. # - e.args[0].args[2]: The original `Exception` (e.g. a `ValueError` due
  88. # to a config mismatch) thrown inside the actor.
  89. raise e.args[0].args[2]
  90. # In any other case, raise the `RayActorError` as-is.
  91. else:
  92. raise e
  93. def _setup(
  94. self,
  95. *,
  96. config: Optional["AlgorithmConfig"] = None,
  97. num_runners: int = 0,
  98. local_runner: Optional[bool] = False,
  99. validate: Optional[bool] = None,
  100. **kwargs: Dict[str, Any],
  101. ) -> None:
  102. # TODO (simon): Deprecate this as soon as we are deprecating the old stack.
  103. self._local_runner = None
  104. if num_runners == 0:
  105. local_runner = True
  106. self.__local_config = config
  107. # Create a number of @ray.remote workers.
  108. self.add_runners(
  109. num_runners,
  110. validate=validate
  111. if validate is not None
  112. else self._validate_runners_after_construction,
  113. **kwargs,
  114. )
  115. if local_runner:
  116. self._local_runner = self._make_runner(
  117. runner_index=0,
  118. num_runners=num_runners,
  119. config=self._local_config,
  120. **kwargs,
  121. )
  122. def add_runners(self, num_runners: int, validate: bool = False, **kwargs) -> None:
  123. """Creates and adds a number of remote runners to this runner set."""
  124. old_num_runners = self._worker_manager.num_actors()
  125. new_runners = [
  126. self._make_runner(
  127. runner_index=old_num_runners + i + 1,
  128. num_runners=old_num_runners + num_runners,
  129. # `self._remote_config` can be large and it's best practice to
  130. # pass it by reference instead of value
  131. # (https://docs.ray.io/en/latest/ray-core/patterns/pass-large-arg-by-value.html) # noqa
  132. config=self._remote_config_obj_ref,
  133. **kwargs,
  134. )
  135. for i in range(num_runners)
  136. ]
  137. # Add the new workers to the worker manager.
  138. self._worker_manager.add_actors(new_runners)
  139. # Validate here, whether all remote workers have been constructed properly
  140. # and are "up and running". Establish initial states.
  141. if validate:
  142. self.validate()
  143. def validate(self) -> Exception:
  144. for result in self._worker_manager.foreach_actor(lambda w: w.assert_healthy()):
  145. # Simiply raise the error, which will get handled by the try-except
  146. # clause around the _setup().
  147. if not result.ok:
  148. e = result.get()
  149. if self._ignore_ray_errors_on_runners:
  150. logger.error(
  151. f"Validation of {self.runner_cls.__name__} failed! Error={str(e)}"
  152. )
  153. else:
  154. raise e
  155. def _make_runner(
  156. self,
  157. *,
  158. runner_index: int,
  159. num_runners: int,
  160. recreated_runner: bool = False,
  161. config: "AlgorithmConfig",
  162. **kwargs,
  163. ) -> ActorHandle:
  164. # TODO (simon): Change this in the `EnvRunner` API
  165. # to `runner_*`.
  166. kwargs = dict(
  167. config=config,
  168. worker_index=runner_index,
  169. num_workers=num_runners,
  170. recreated_worker=recreated_runner,
  171. log_dir=self._logdir,
  172. tune_trial_id=self._tune_trial_id,
  173. **kwargs,
  174. )
  175. # If a local runner is requested just return a runner instance.
  176. if runner_index == 0:
  177. return self.runner_cls(**kwargs)
  178. # Otherwise define a bundle index and schedule the remote worker.
  179. pg_bundle_idx = (
  180. -1
  181. if ray.util.get_current_placement_group() is None
  182. else self._pg_offset + runner_index
  183. )
  184. return (
  185. ray.remote(**self._remote_args)(self.runner_cls)
  186. .options(placement_group_bundle_index=pg_bundle_idx)
  187. .remote(**kwargs)
  188. )
  189. def sync_runner_states(
  190. self,
  191. *,
  192. config: "AlgorithmConfig",
  193. from_runner: Optional[Runner] = None,
  194. env_steps_sampled: Optional[int] = None,
  195. connector_states: Optional[List[Dict[str, Any]]] = None,
  196. rl_module_state: Optional[Dict[str, Any]] = None,
  197. runner_indices_to_update: Optional[List[int]] = None,
  198. env_to_module=None,
  199. module_to_env=None,
  200. **kwargs,
  201. ):
  202. """Synchronizes the connectors of this `RunnerGroup`'s `Runner`s."""
  203. # If no `Runner` is passed in synchronize through the local `Runner`.
  204. from_runner = from_runner or self.local_runner
  205. merge = config.merge_runner_states or (
  206. config.merge_runner_states == "training_only" and config.in_evaluation
  207. )
  208. broadcast = config.broadcast_runner_states
  209. # Early out if the number of (healthy) remote workers is 0. In this case, the
  210. # local worker is the only operating worker and thus of course always holds
  211. # the reference connector state.
  212. if self.num_healthy_remote_runners == 0 and self.local_runner:
  213. self.local_runner.set_state(
  214. {
  215. **(
  216. {NUM_ENV_STEPS_SAMPLED_LIFETIME: env_steps_sampled}
  217. if env_steps_sampled is not None
  218. else {}
  219. ),
  220. **(rl_module_state or {}),
  221. }
  222. )
  223. # Also early out, if we don't merge AND don't broadcast.
  224. if not merge and not broadcast:
  225. return
  226. # Use states from all remote `Runner`s.
  227. if merge:
  228. if connector_states == []:
  229. runner_states = {}
  230. else:
  231. if connector_states is None:
  232. connector_states = self.foreach_runner(
  233. lambda w: w.get_state(
  234. components=[
  235. COMPONENT_ENV_TO_MODULE_CONNECTOR,
  236. COMPONENT_MODULE_TO_ENV_CONNECTOR,
  237. ]
  238. ),
  239. local_runner=False,
  240. timeout_seconds=(
  241. config.sync_filters_on_rollout_workers_timeout_s
  242. ),
  243. )
  244. env_to_module_states = [
  245. s[COMPONENT_ENV_TO_MODULE_CONNECTOR]
  246. for s in connector_states
  247. if COMPONENT_ENV_TO_MODULE_CONNECTOR in s
  248. ]
  249. module_to_env_states = [
  250. s[COMPONENT_MODULE_TO_ENV_CONNECTOR]
  251. for s in connector_states
  252. if COMPONENT_MODULE_TO_ENV_CONNECTOR in s
  253. ]
  254. if (
  255. self.local_runner is not None
  256. and hasattr(self.local_runner, "_env_to_module")
  257. and hasattr(self.local_runner, "_module_to_env")
  258. ):
  259. assert env_to_module is None
  260. env_to_module = self.local_runner._env_to_module
  261. assert module_to_env is None
  262. module_to_env = self.local_runner._module_to_env
  263. runner_states = {}
  264. if env_to_module_states:
  265. runner_states.update(
  266. {
  267. COMPONENT_ENV_TO_MODULE_CONNECTOR: (
  268. env_to_module.merge_states(env_to_module_states)
  269. ),
  270. }
  271. )
  272. if module_to_env_states:
  273. runner_states.update(
  274. {
  275. COMPONENT_MODULE_TO_ENV_CONNECTOR: (
  276. module_to_env.merge_states(module_to_env_states)
  277. ),
  278. }
  279. )
  280. # Ignore states from remote `Runner`s (use the current `from_worker` states
  281. # only).
  282. else:
  283. if from_runner is None:
  284. runner_states = {
  285. COMPONENT_ENV_TO_MODULE_CONNECTOR: env_to_module.get_state(),
  286. COMPONENT_MODULE_TO_ENV_CONNECTOR: module_to_env.get_state(),
  287. }
  288. else:
  289. runner_states = from_runner.get_state(
  290. components=[
  291. COMPONENT_ENV_TO_MODULE_CONNECTOR,
  292. COMPONENT_MODULE_TO_ENV_CONNECTOR,
  293. ]
  294. )
  295. # Update the global number of environment steps, if necessary.
  296. # Make sure to divide by the number of env runners (such that each `Runner`
  297. # knows (roughly) its own(!) lifetime count and can infer the global lifetime
  298. # count from it).
  299. if env_steps_sampled is not None:
  300. runner_states[NUM_ENV_STEPS_SAMPLED_LIFETIME] = env_steps_sampled // (
  301. config.num_runners or 1
  302. )
  303. # If we do NOT want remote `Runner`s to get their Connector states updated,
  304. # only update the local worker here (with all state components, except the model
  305. # weights) and then remove the connector components.
  306. if not broadcast:
  307. if self.local_runner is not None:
  308. self.local_runner.set_state(runner_states)
  309. else:
  310. env_to_module.set_state(
  311. runner_states.get(COMPONENT_ENV_TO_MODULE_CONNECTOR), {}
  312. )
  313. module_to_env.set_state(
  314. runner_states.get(COMPONENT_MODULE_TO_ENV_CONNECTOR), {}
  315. )
  316. runner_states.pop(COMPONENT_ENV_TO_MODULE_CONNECTOR, None)
  317. runner_states.pop(COMPONENT_MODULE_TO_ENV_CONNECTOR, None)
  318. # If there are components in the state left -> Update remote workers with these
  319. # state components (and maybe the local worker, if it hasn't been updated yet).
  320. if runner_states:
  321. # Update the local `Runner`, but NOT with the weights. If used at all for
  322. # evaluation (through the user calling `self.evaluate`), RLlib would update
  323. # the weights up front either way.
  324. if self.local_runner is not None and broadcast:
  325. self.local_runner.set_state(runner_states)
  326. # Send the model weights only to remote `Runner`s.
  327. # In case the local `Runner` is ever needed for evaluation,
  328. # RLlib updates its weight right before such an eval step.
  329. if rl_module_state:
  330. runner_states.update(rl_module_state)
  331. # Broadcast updated states back to all workers.
  332. self.foreach_runner(
  333. "set_state", # Call the `set_state()` remote method.
  334. kwargs=dict(state=runner_states),
  335. remote_worker_ids=runner_indices_to_update,
  336. local_runner=False,
  337. timeout_seconds=0.0, # This is a state update -> Fire-and-forget.
  338. )
  339. def sync_weights(
  340. self,
  341. policies: Optional[List[PolicyID]] = None,
  342. from_worker_or_learner_group: Optional[Union[Runner, "LearnerGroup"]] = None,
  343. to_worker_indices: Optional[List[int]] = None,
  344. timeout_seconds: Optional[float] = 0.0,
  345. inference_only: Optional[bool] = False,
  346. **kwargs,
  347. ) -> None:
  348. """Syncs model weights from the given weight source to all remote workers.
  349. Weight source can be either a (local) rollout worker or a learner_group. It
  350. should just implement a `get_weights` method.
  351. Args:
  352. policies: Optional list of PolicyIDs to sync weights for.
  353. If None (default), sync weights to/from all policies.
  354. from_worker_or_learner_group: Optional (local) `Runner` instance or
  355. LearnerGroup instance to sync from. If None (default),
  356. sync from this `Runner`Group's local worker.
  357. to_worker_indices: Optional list of worker indices to sync the
  358. weights to. If None (default), sync to all remote workers.
  359. global_vars: An optional global vars dict to set this
  360. worker to. If None, do not update the global_vars.
  361. timeout_seconds: Timeout in seconds to wait for the sync weights
  362. calls to complete. Default is 0.0 (fire-and-forget, do not wait
  363. for any sync calls to finish). Setting this to 0.0 might significantly
  364. improve algorithm performance, depending on the algo's `training_step`
  365. logic.
  366. inference_only: Sync weights with workers that keep inference-only
  367. modules. This is needed for algorithms in the new stack that
  368. use inference-only modules. In this case only a part of the
  369. parameters are synced to the workers. Default is False.
  370. """
  371. if self.local_runner is None and from_worker_or_learner_group is None:
  372. raise TypeError(
  373. "No `local_runner` in `RunnerGroup`! Must provide "
  374. "`from_worker_or_learner_group` arg in `sync_weights()`!"
  375. )
  376. # Only sync if we have remote workers or `from_worker_or_trainer` is provided.
  377. rl_module_state = None
  378. if self.num_remote_runners or from_worker_or_learner_group is not None:
  379. weights_src = (
  380. from_worker_or_learner_group
  381. if from_worker_or_learner_group is not None
  382. else self.local_runner
  383. )
  384. if weights_src is None:
  385. raise ValueError(
  386. "`from_worker_or_trainer` is None. In this case, `RunnerGroup`^ "
  387. "should have `local_runner`. But `local_runner` is also `None`."
  388. )
  389. modules = (
  390. [COMPONENT_RL_MODULE + "/" + p for p in policies]
  391. if policies is not None
  392. else [COMPONENT_RL_MODULE]
  393. )
  394. # LearnerGroup has a Learner, which has an RLModule.
  395. if isinstance(weights_src, LearnerGroup):
  396. rl_module_state = weights_src.get_state(
  397. components=[COMPONENT_LEARNER + "/" + m for m in modules],
  398. inference_only=inference_only,
  399. )[COMPONENT_LEARNER]
  400. # `Runner` (new API stack).
  401. else:
  402. # Runner (remote) has a RLModule.
  403. # TODO (sven): Replace this with a new ActorManager API:
  404. # try_remote_request_till_success("get_state") -> tuple(int,
  405. # remoteresult)
  406. # `weights_src` could be the ActorManager, then. Then RLlib would know
  407. # that it has to ping the manager to try all healthy actors until the
  408. # first returns something.
  409. if isinstance(weights_src, ActorHandle):
  410. rl_module_state = ray.get(
  411. weights_src.get_state.remote(
  412. components=modules,
  413. inference_only=inference_only,
  414. )
  415. )
  416. # `Runner` (local) has an RLModule.
  417. else:
  418. rl_module_state = weights_src.get_state(
  419. components=modules,
  420. inference_only=inference_only,
  421. )
  422. # Make sure `rl_module_state` only contains the weights and the
  423. # weight seq no, nothing else.
  424. rl_module_state = {
  425. k: v
  426. for k, v in rl_module_state.items()
  427. if k in [COMPONENT_RL_MODULE, WEIGHTS_SEQ_NO]
  428. }
  429. # Move weights to the object store to avoid having to make n pickled
  430. # copies of the weights dict for each worker.
  431. rl_module_state_ref = ray.put(rl_module_state)
  432. # Sync to specified remote workers in this `Runner`Group.
  433. self.foreach_runner(
  434. func="set_state",
  435. kwargs=dict(state=rl_module_state_ref),
  436. local_runner=False, # Do not sync back to local worker.
  437. remote_worker_ids=to_worker_indices,
  438. timeout_seconds=timeout_seconds,
  439. )
  440. # If `from_worker_or_learner_group` is provided, also sync to this
  441. # `RunnerGroup`'s local worker.
  442. if self.local_runner is not None:
  443. if from_worker_or_learner_group is not None:
  444. self.local_runner.set_state(rl_module_state)
  445. def reset(self, new_remote_runners: List[ActorHandle]) -> None:
  446. """Hard overrides the remote `Runner`s in this set with the provided ones.
  447. Args:
  448. new_remote_workers: A list of new `Runner`s (as `ActorHandles`) to use as
  449. new remote workers.
  450. """
  451. self._worker_manager.clear()
  452. self._worker_manager.add_actors(new_remote_runners)
  453. def stop(self) -> None:
  454. """Calls `stop` on all `Runner`s (including the local one)."""
  455. try:
  456. # Make sure we stop all `Runner`s, include the ones that were just
  457. # restarted / recovered or that are tagged unhealthy (at least, we should
  458. # try).
  459. self.foreach_runner(
  460. lambda w: w.stop(), healthy_only=False, local_runner=True
  461. )
  462. except Exception:
  463. logger.exception("Failed to stop workers!")
  464. finally:
  465. self._worker_manager.clear()
  466. def foreach_runner(
  467. self,
  468. func: Union[Callable[[Runner], T], List[Callable[[Runner], T]], str, List[str]],
  469. *,
  470. kwargs=None,
  471. local_runner: bool = True,
  472. healthy_only: bool = True,
  473. remote_worker_ids: List[int] = None,
  474. timeout_seconds: Optional[float] = None,
  475. return_obj_refs: bool = False,
  476. mark_healthy: bool = False,
  477. ) -> List[T]:
  478. """Calls the given function with each `Runner` as its argument.
  479. Args:
  480. func: The function to call for each `Runner`s. The only call argument is
  481. the respective `Runner` instance.
  482. local_env_runner: Whether to apply `func` to local `Runner`, too.
  483. Default is True.
  484. healthy_only: Apply `func` on known-to-be healthy `Runner`s only.
  485. remote_worker_ids: Apply `func` on a selected set of remote `Runner`s.
  486. Use None (default) for all remote `Runner`s.
  487. timeout_seconds: Time to wait (in seconds) for results. Set this to 0.0 for
  488. fire-and-forget. Set this to None (default) to wait infinitely (i.e. for
  489. synchronous execution).
  490. return_obj_refs: Whether to return `ObjectRef` instead of actual results.
  491. Note, for fault tolerance reasons, these returned ObjectRefs should
  492. never be resolved with ray.get() outside of this `RunnerGroup`.
  493. mark_healthy: Whether to mark all those `Runner`s healthy again that are
  494. currently marked unhealthy AND that returned results from the remote
  495. call (within the given `timeout_seconds`).
  496. Note that `Runner`s are NOT set unhealthy, if they simply time out
  497. (only if they return a `RayActorError`).
  498. Also note that this setting is ignored if `healthy_only=True` (b/c
  499. `mark_healthy` only affects `Runner`s that are currently tagged as
  500. unhealthy).
  501. Returns:
  502. The list of return values of all calls to `func([worker])`.
  503. """
  504. assert (
  505. not return_obj_refs or not local_runner
  506. ), "Can not return `ObjectRef` from local worker."
  507. local_result = []
  508. if local_runner and self.local_runner is not None:
  509. if kwargs:
  510. local_kwargs = kwargs[0]
  511. kwargs = kwargs[1:]
  512. else:
  513. local_kwargs = {}
  514. kwargs = kwargs
  515. if isinstance(func, str):
  516. local_result = [getattr(self.local_runner, func)(**local_kwargs)]
  517. else:
  518. local_result = [func(self.local_runner, **local_kwargs)]
  519. if not self._worker_manager.actor_ids():
  520. return local_result
  521. remote_results = self._worker_manager.foreach_actor(
  522. func,
  523. kwargs=kwargs,
  524. healthy_only=healthy_only,
  525. remote_actor_ids=remote_worker_ids,
  526. timeout_seconds=timeout_seconds,
  527. return_obj_refs=return_obj_refs,
  528. mark_healthy=mark_healthy,
  529. )
  530. FaultTolerantActorManager.handle_remote_call_result_errors(
  531. remote_results, ignore_ray_errors=self._ignore_ray_errors_on_runners
  532. )
  533. # With application errors handled, return good results.
  534. remote_results = [r.get() for r in remote_results.ignore_errors()]
  535. return local_result + remote_results
  536. def foreach_runner_async(
  537. self,
  538. func: Union[Callable[[Runner], T], List[Callable[[Runner], T]], str, List[str]],
  539. *,
  540. healthy_only: bool = True,
  541. remote_worker_ids: List[int] = None,
  542. ) -> int:
  543. """Calls the given function asynchronously with each `Runner` as the argument.
  544. Does not return results directly. Instead, `fetch_ready_async_reqs()` can be
  545. used to pull results in an async manner whenever they are available.
  546. Args:
  547. func: The function to call for each `Runner`s. The only call argument is
  548. the respective `Runner` instance.
  549. healthy_only: Apply `func` on known-to-be healthy `Runner`s only.
  550. remote_worker_ids: Apply `func` on a selected set of remote `Runner`s.
  551. Returns:
  552. The number of async requests that have actually been made. This is the
  553. length of `remote_worker_ids` (or self.num_remote_workers()` if
  554. `remote_worker_ids` is None) minus the number of requests that were NOT
  555. made b/c a remote `Runner` already had its
  556. `max_remote_requests_in_flight_per_actor` counter reached.
  557. """
  558. return self._worker_manager.foreach_actor_async(
  559. func,
  560. healthy_only=healthy_only,
  561. remote_actor_ids=remote_worker_ids,
  562. )
  563. def fetch_ready_async_reqs(
  564. self,
  565. *,
  566. timeout_seconds: Optional[float] = 0.0,
  567. return_obj_refs: bool = False,
  568. mark_healthy: bool = False,
  569. ) -> List[Tuple[int, T]]:
  570. """Get esults from outstanding asynchronous requests that are ready.
  571. Args:
  572. timeout_seconds: Time to wait for results. Default is 0, meaning
  573. those requests that are already ready.
  574. return_obj_refs: Whether to return ObjectRef instead of actual results.
  575. mark_healthy: Whether to mark all those workers healthy again that are
  576. currently marked unhealthy AND that returned results from the remote
  577. call (within the given `timeout_seconds`).
  578. Note that workers are NOT set unhealthy, if they simply time out
  579. (only if they return a RayActorError).
  580. Also note that this setting is ignored if `healthy_only=True` (b/c
  581. `mark_healthy` only affects workers that are currently tagged as
  582. unhealthy).
  583. Returns:
  584. A list of results successfully returned from outstanding remote calls,
  585. paired with the indices of the callee workers.
  586. """
  587. remote_results = self._worker_manager.fetch_ready_async_reqs(
  588. timeout_seconds=timeout_seconds,
  589. return_obj_refs=return_obj_refs,
  590. mark_healthy=mark_healthy,
  591. )
  592. FaultTolerantActorManager.handle_remote_call_result_errors(
  593. remote_results,
  594. ignore_ray_errors=self._ignore_ray_errors_on_runners,
  595. )
  596. return [(r.actor_id, r.get()) for r in remote_results.ignore_errors()]
  597. def probe_unhealthy_runners(self) -> List[int]:
  598. """Checks for unhealthy workers and tries restoring their states.
  599. Returns:
  600. List of IDs of the workers that were restored.
  601. """
  602. return self._worker_manager.probe_unhealthy_actors(
  603. timeout_seconds=self.runner_health_probe_timeout_s,
  604. mark_healthy=True,
  605. )
  606. @property
  607. @abc.abstractmethod
  608. def runner_health_probe_timeout_s(self):
  609. """Number of seconds to wait for health probe calls to `Runner`s."""
  610. @property
  611. @abc.abstractmethod
  612. def runner_cls(self) -> Callable:
  613. """Class for each runner."""
  614. @property
  615. def _local_config(self) -> "AlgorithmConfig":
  616. """Returns the config for a local `Runner`."""
  617. return self.__local_config
  618. @property
  619. def local_runner(self) -> Runner:
  620. """Returns the local `Runner`."""
  621. return self._local_runner
  622. @property
  623. def healthy_runner_ids(self) -> List[int]:
  624. """Returns the list of remote `Runner` IDs."""
  625. return self._worker_manager.healthy_actor_ids()
  626. @property
  627. @abc.abstractmethod
  628. def num_runners(self) -> int:
  629. """Number of runners to schedule and manage."""
  630. @property
  631. def num_remote_runners(self) -> int:
  632. """Number of remote `Runner`s."""
  633. return self._worker_manager.num_actors()
  634. @property
  635. def num_healthy_remote_runners(self) -> int:
  636. """Returns the number of healthy remote `Runner`s."""
  637. return self._worker_manager.num_healthy_actors()
  638. @property
  639. def num_healthy_runners(self) -> int:
  640. """Returns the number of healthy `Runner`s."""
  641. return int(bool(self._local_runner)) + self.num_healthy_remote_runners()
  642. @property
  643. def num_in_flight_async_reqs(self) -> int:
  644. """Returns the number of in-flight async requests."""
  645. return self._worker_manager.num_outstanding_async_reqs()
  646. @property
  647. def num_remote_runner_restarts(self) -> int:
  648. """Returns the number of times managed remote `Runner`s have been restarted."""
  649. return self._worker_manager.total_num_restarts()
  650. @property
  651. @abc.abstractmethod
  652. def _remote_args(self):
  653. """Remote arguments for each runner."""
  654. @property
  655. @abc.abstractmethod
  656. def _ignore_ray_errors_on_runners(self):
  657. """If errors in runners should be ignored."""
  658. @property
  659. @abc.abstractmethod
  660. def _max_requests_in_flight_per_runner(self):
  661. """Maximum requests in flight per runner."""
  662. @property
  663. @abc.abstractmethod
  664. def _validate_runners_after_construction(self):
  665. """If runners should validated after constructed."""