| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751 |
- import abc
- import logging
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- List,
- Optional,
- Tuple,
- TypeVar,
- Union,
- )
- import ray
- from ray.actor import ActorHandle
- from ray.exceptions import RayActorError
- from ray.rllib.core import (
- COMPONENT_ENV_TO_MODULE_CONNECTOR,
- COMPONENT_LEARNER,
- COMPONENT_MODULE_TO_ENV_CONNECTOR,
- COMPONENT_RL_MODULE,
- )
- from ray.rllib.core.learner.learner_group import LearnerGroup
- from ray.rllib.utils.actor_manager import FaultTolerantActorManager
- from ray.rllib.utils.metrics import NUM_ENV_STEPS_SAMPLED_LIFETIME, WEIGHTS_SEQ_NO
- from ray.rllib.utils.runners.runner import Runner
- from ray.rllib.utils.typing import PolicyID
- from ray.util.annotations import DeveloperAPI
- if TYPE_CHECKING:
- from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
- logger = logging.getLogger(__name__)
- # Generic type var for `foreach_*` methods.
- T = TypeVar("T")
- @DeveloperAPI
- class RunnerGroup(metaclass=abc.ABCMeta):
- def __init__(
- self,
- config: "AlgorithmConfig",
- # TODO (simon): Check, if this is needed. Derived classes could define
- # this if needed.
- # default_policy_class: Optional[Type[Policy]]
- local_runner: Optional[bool] = False,
- logdir: Optional[str] = None,
- # TODO (simon): Check, if still needed.
- tune_trial_id: Optional[str] = None,
- pg_offset: int = 0,
- _setup: bool = True,
- **kwargs: Dict[str, Any],
- ) -> None:
- # TODO (simon): Remove when old stack is deprecated.
- self.config: AlgorithmConfig = (
- AlgorithmConfig.from_dict(config)
- if isinstance(config, dict)
- else (config or AlgorithmConfig())
- )
- self._remote_config = config
- self._remote_config_obj_ref = ray.put(self._remote_config)
- self._tune_trial_id = tune_trial_id
- self._pg_offset = pg_offset
- self._logdir = logdir
- self._worker_manager = FaultTolerantActorManager(
- max_remote_requests_in_flight_per_actor=self._max_requests_in_flight_per_runner,
- init_id=1,
- )
- if _setup:
- try:
- self._setup(
- config=config,
- num_runners=self.num_runners,
- local_runner=local_runner,
- **kwargs,
- )
- # `RunnerGroup` creation possibly fails, if some (remote) workers cannot
- # be initialized properly (due to some errors in the `Runners`'s
- # constructor).
- except RayActorError as e:
- # In case of an actor (remote worker) init failure, the remote worker
- # may still exist and will be accessible, however, e.g. calling
- # its `run.remote()` would result in strange "property not found"
- # errors.
- if e.actor_init_failed:
- # Raise the original error here that the `Runners` raised
- # during its construction process. This is to enforce transparency
- # for the user (better to understand the real reason behind the
- # failure).
- # - e.args[0]: The `RayTaskError` (inside the caught `RayActorError`).
- # - e.args[0].args[2]: The original `Exception` (e.g. a `ValueError` due
- # to a config mismatch) thrown inside the actor.
- raise e.args[0].args[2]
- # In any other case, raise the `RayActorError` as-is.
- else:
- raise e
- def _setup(
- self,
- *,
- config: Optional["AlgorithmConfig"] = None,
- num_runners: int = 0,
- local_runner: Optional[bool] = False,
- validate: Optional[bool] = None,
- **kwargs: Dict[str, Any],
- ) -> None:
- # TODO (simon): Deprecate this as soon as we are deprecating the old stack.
- self._local_runner = None
- if num_runners == 0:
- local_runner = True
- self.__local_config = config
- # Create a number of @ray.remote workers.
- self.add_runners(
- num_runners,
- validate=validate
- if validate is not None
- else self._validate_runners_after_construction,
- **kwargs,
- )
- if local_runner:
- self._local_runner = self._make_runner(
- runner_index=0,
- num_runners=num_runners,
- config=self._local_config,
- **kwargs,
- )
- def add_runners(self, num_runners: int, validate: bool = False, **kwargs) -> None:
- """Creates and adds a number of remote runners to this runner set."""
- old_num_runners = self._worker_manager.num_actors()
- new_runners = [
- self._make_runner(
- runner_index=old_num_runners + i + 1,
- num_runners=old_num_runners + num_runners,
- # `self._remote_config` can be large and it's best practice to
- # pass it by reference instead of value
- # (https://docs.ray.io/en/latest/ray-core/patterns/pass-large-arg-by-value.html) # noqa
- config=self._remote_config_obj_ref,
- **kwargs,
- )
- for i in range(num_runners)
- ]
- # Add the new workers to the worker manager.
- self._worker_manager.add_actors(new_runners)
- # Validate here, whether all remote workers have been constructed properly
- # and are "up and running". Establish initial states.
- if validate:
- self.validate()
- def validate(self) -> Exception:
- for result in self._worker_manager.foreach_actor(lambda w: w.assert_healthy()):
- # Simiply raise the error, which will get handled by the try-except
- # clause around the _setup().
- if not result.ok:
- e = result.get()
- if self._ignore_ray_errors_on_runners:
- logger.error(
- f"Validation of {self.runner_cls.__name__} failed! Error={str(e)}"
- )
- else:
- raise e
- def _make_runner(
- self,
- *,
- runner_index: int,
- num_runners: int,
- recreated_runner: bool = False,
- config: "AlgorithmConfig",
- **kwargs,
- ) -> ActorHandle:
- # TODO (simon): Change this in the `EnvRunner` API
- # to `runner_*`.
- kwargs = dict(
- config=config,
- worker_index=runner_index,
- num_workers=num_runners,
- recreated_worker=recreated_runner,
- log_dir=self._logdir,
- tune_trial_id=self._tune_trial_id,
- **kwargs,
- )
- # If a local runner is requested just return a runner instance.
- if runner_index == 0:
- return self.runner_cls(**kwargs)
- # Otherwise define a bundle index and schedule the remote worker.
- pg_bundle_idx = (
- -1
- if ray.util.get_current_placement_group() is None
- else self._pg_offset + runner_index
- )
- return (
- ray.remote(**self._remote_args)(self.runner_cls)
- .options(placement_group_bundle_index=pg_bundle_idx)
- .remote(**kwargs)
- )
- def sync_runner_states(
- self,
- *,
- config: "AlgorithmConfig",
- from_runner: Optional[Runner] = None,
- env_steps_sampled: Optional[int] = None,
- connector_states: Optional[List[Dict[str, Any]]] = None,
- rl_module_state: Optional[Dict[str, Any]] = None,
- runner_indices_to_update: Optional[List[int]] = None,
- env_to_module=None,
- module_to_env=None,
- **kwargs,
- ):
- """Synchronizes the connectors of this `RunnerGroup`'s `Runner`s."""
- # If no `Runner` is passed in synchronize through the local `Runner`.
- from_runner = from_runner or self.local_runner
- merge = config.merge_runner_states or (
- config.merge_runner_states == "training_only" and config.in_evaluation
- )
- broadcast = config.broadcast_runner_states
- # Early out if the number of (healthy) remote workers is 0. In this case, the
- # local worker is the only operating worker and thus of course always holds
- # the reference connector state.
- if self.num_healthy_remote_runners == 0 and self.local_runner:
- self.local_runner.set_state(
- {
- **(
- {NUM_ENV_STEPS_SAMPLED_LIFETIME: env_steps_sampled}
- if env_steps_sampled is not None
- else {}
- ),
- **(rl_module_state or {}),
- }
- )
- # Also early out, if we don't merge AND don't broadcast.
- if not merge and not broadcast:
- return
- # Use states from all remote `Runner`s.
- if merge:
- if connector_states == []:
- runner_states = {}
- else:
- if connector_states is None:
- connector_states = self.foreach_runner(
- lambda w: w.get_state(
- components=[
- COMPONENT_ENV_TO_MODULE_CONNECTOR,
- COMPONENT_MODULE_TO_ENV_CONNECTOR,
- ]
- ),
- local_runner=False,
- timeout_seconds=(
- config.sync_filters_on_rollout_workers_timeout_s
- ),
- )
- env_to_module_states = [
- s[COMPONENT_ENV_TO_MODULE_CONNECTOR]
- for s in connector_states
- if COMPONENT_ENV_TO_MODULE_CONNECTOR in s
- ]
- module_to_env_states = [
- s[COMPONENT_MODULE_TO_ENV_CONNECTOR]
- for s in connector_states
- if COMPONENT_MODULE_TO_ENV_CONNECTOR in s
- ]
- if (
- self.local_runner is not None
- and hasattr(self.local_runner, "_env_to_module")
- and hasattr(self.local_runner, "_module_to_env")
- ):
- assert env_to_module is None
- env_to_module = self.local_runner._env_to_module
- assert module_to_env is None
- module_to_env = self.local_runner._module_to_env
- runner_states = {}
- if env_to_module_states:
- runner_states.update(
- {
- COMPONENT_ENV_TO_MODULE_CONNECTOR: (
- env_to_module.merge_states(env_to_module_states)
- ),
- }
- )
- if module_to_env_states:
- runner_states.update(
- {
- COMPONENT_MODULE_TO_ENV_CONNECTOR: (
- module_to_env.merge_states(module_to_env_states)
- ),
- }
- )
- # Ignore states from remote `Runner`s (use the current `from_worker` states
- # only).
- else:
- if from_runner is None:
- runner_states = {
- COMPONENT_ENV_TO_MODULE_CONNECTOR: env_to_module.get_state(),
- COMPONENT_MODULE_TO_ENV_CONNECTOR: module_to_env.get_state(),
- }
- else:
- runner_states = from_runner.get_state(
- components=[
- COMPONENT_ENV_TO_MODULE_CONNECTOR,
- COMPONENT_MODULE_TO_ENV_CONNECTOR,
- ]
- )
- # Update the global number of environment steps, if necessary.
- # Make sure to divide by the number of env runners (such that each `Runner`
- # knows (roughly) its own(!) lifetime count and can infer the global lifetime
- # count from it).
- if env_steps_sampled is not None:
- runner_states[NUM_ENV_STEPS_SAMPLED_LIFETIME] = env_steps_sampled // (
- config.num_runners or 1
- )
- # If we do NOT want remote `Runner`s to get their Connector states updated,
- # only update the local worker here (with all state components, except the model
- # weights) and then remove the connector components.
- if not broadcast:
- if self.local_runner is not None:
- self.local_runner.set_state(runner_states)
- else:
- env_to_module.set_state(
- runner_states.get(COMPONENT_ENV_TO_MODULE_CONNECTOR), {}
- )
- module_to_env.set_state(
- runner_states.get(COMPONENT_MODULE_TO_ENV_CONNECTOR), {}
- )
- runner_states.pop(COMPONENT_ENV_TO_MODULE_CONNECTOR, None)
- runner_states.pop(COMPONENT_MODULE_TO_ENV_CONNECTOR, None)
- # If there are components in the state left -> Update remote workers with these
- # state components (and maybe the local worker, if it hasn't been updated yet).
- if runner_states:
- # Update the local `Runner`, but NOT with the weights. If used at all for
- # evaluation (through the user calling `self.evaluate`), RLlib would update
- # the weights up front either way.
- if self.local_runner is not None and broadcast:
- self.local_runner.set_state(runner_states)
- # Send the model weights only to remote `Runner`s.
- # In case the local `Runner` is ever needed for evaluation,
- # RLlib updates its weight right before such an eval step.
- if rl_module_state:
- runner_states.update(rl_module_state)
- # Broadcast updated states back to all workers.
- self.foreach_runner(
- "set_state", # Call the `set_state()` remote method.
- kwargs=dict(state=runner_states),
- remote_worker_ids=runner_indices_to_update,
- local_runner=False,
- timeout_seconds=0.0, # This is a state update -> Fire-and-forget.
- )
- def sync_weights(
- self,
- policies: Optional[List[PolicyID]] = None,
- from_worker_or_learner_group: Optional[Union[Runner, "LearnerGroup"]] = None,
- to_worker_indices: Optional[List[int]] = None,
- timeout_seconds: Optional[float] = 0.0,
- inference_only: Optional[bool] = False,
- **kwargs,
- ) -> None:
- """Syncs model weights from the given weight source to all remote workers.
- Weight source can be either a (local) rollout worker or a learner_group. It
- should just implement a `get_weights` method.
- Args:
- policies: Optional list of PolicyIDs to sync weights for.
- If None (default), sync weights to/from all policies.
- from_worker_or_learner_group: Optional (local) `Runner` instance or
- LearnerGroup instance to sync from. If None (default),
- sync from this `Runner`Group's local worker.
- to_worker_indices: Optional list of worker indices to sync the
- weights to. If None (default), sync to all remote workers.
- global_vars: An optional global vars dict to set this
- worker to. If None, do not update the global_vars.
- timeout_seconds: Timeout in seconds to wait for the sync weights
- calls to complete. Default is 0.0 (fire-and-forget, do not wait
- for any sync calls to finish). Setting this to 0.0 might significantly
- improve algorithm performance, depending on the algo's `training_step`
- logic.
- inference_only: Sync weights with workers that keep inference-only
- modules. This is needed for algorithms in the new stack that
- use inference-only modules. In this case only a part of the
- parameters are synced to the workers. Default is False.
- """
- if self.local_runner is None and from_worker_or_learner_group is None:
- raise TypeError(
- "No `local_runner` in `RunnerGroup`! Must provide "
- "`from_worker_or_learner_group` arg in `sync_weights()`!"
- )
- # Only sync if we have remote workers or `from_worker_or_trainer` is provided.
- rl_module_state = None
- if self.num_remote_runners or from_worker_or_learner_group is not None:
- weights_src = (
- from_worker_or_learner_group
- if from_worker_or_learner_group is not None
- else self.local_runner
- )
- if weights_src is None:
- raise ValueError(
- "`from_worker_or_trainer` is None. In this case, `RunnerGroup`^ "
- "should have `local_runner`. But `local_runner` is also `None`."
- )
- modules = (
- [COMPONENT_RL_MODULE + "/" + p for p in policies]
- if policies is not None
- else [COMPONENT_RL_MODULE]
- )
- # LearnerGroup has a Learner, which has an RLModule.
- if isinstance(weights_src, LearnerGroup):
- rl_module_state = weights_src.get_state(
- components=[COMPONENT_LEARNER + "/" + m for m in modules],
- inference_only=inference_only,
- )[COMPONENT_LEARNER]
- # `Runner` (new API stack).
- else:
- # Runner (remote) has a RLModule.
- # TODO (sven): Replace this with a new ActorManager API:
- # try_remote_request_till_success("get_state") -> tuple(int,
- # remoteresult)
- # `weights_src` could be the ActorManager, then. Then RLlib would know
- # that it has to ping the manager to try all healthy actors until the
- # first returns something.
- if isinstance(weights_src, ActorHandle):
- rl_module_state = ray.get(
- weights_src.get_state.remote(
- components=modules,
- inference_only=inference_only,
- )
- )
- # `Runner` (local) has an RLModule.
- else:
- rl_module_state = weights_src.get_state(
- components=modules,
- inference_only=inference_only,
- )
- # Make sure `rl_module_state` only contains the weights and the
- # weight seq no, nothing else.
- rl_module_state = {
- k: v
- for k, v in rl_module_state.items()
- if k in [COMPONENT_RL_MODULE, WEIGHTS_SEQ_NO]
- }
- # Move weights to the object store to avoid having to make n pickled
- # copies of the weights dict for each worker.
- rl_module_state_ref = ray.put(rl_module_state)
- # Sync to specified remote workers in this `Runner`Group.
- self.foreach_runner(
- func="set_state",
- kwargs=dict(state=rl_module_state_ref),
- local_runner=False, # Do not sync back to local worker.
- remote_worker_ids=to_worker_indices,
- timeout_seconds=timeout_seconds,
- )
- # If `from_worker_or_learner_group` is provided, also sync to this
- # `RunnerGroup`'s local worker.
- if self.local_runner is not None:
- if from_worker_or_learner_group is not None:
- self.local_runner.set_state(rl_module_state)
- def reset(self, new_remote_runners: List[ActorHandle]) -> None:
- """Hard overrides the remote `Runner`s in this set with the provided ones.
- Args:
- new_remote_workers: A list of new `Runner`s (as `ActorHandles`) to use as
- new remote workers.
- """
- self._worker_manager.clear()
- self._worker_manager.add_actors(new_remote_runners)
- def stop(self) -> None:
- """Calls `stop` on all `Runner`s (including the local one)."""
- try:
- # Make sure we stop all `Runner`s, include the ones that were just
- # restarted / recovered or that are tagged unhealthy (at least, we should
- # try).
- self.foreach_runner(
- lambda w: w.stop(), healthy_only=False, local_runner=True
- )
- except Exception:
- logger.exception("Failed to stop workers!")
- finally:
- self._worker_manager.clear()
- def foreach_runner(
- self,
- func: Union[Callable[[Runner], T], List[Callable[[Runner], T]], str, List[str]],
- *,
- kwargs=None,
- local_runner: bool = True,
- healthy_only: bool = True,
- remote_worker_ids: List[int] = None,
- timeout_seconds: Optional[float] = None,
- return_obj_refs: bool = False,
- mark_healthy: bool = False,
- ) -> List[T]:
- """Calls the given function with each `Runner` as its argument.
- Args:
- func: The function to call for each `Runner`s. The only call argument is
- the respective `Runner` instance.
- local_env_runner: Whether to apply `func` to local `Runner`, too.
- Default is True.
- healthy_only: Apply `func` on known-to-be healthy `Runner`s only.
- remote_worker_ids: Apply `func` on a selected set of remote `Runner`s.
- Use None (default) for all remote `Runner`s.
- timeout_seconds: Time to wait (in seconds) for results. Set this to 0.0 for
- fire-and-forget. Set this to None (default) to wait infinitely (i.e. for
- synchronous execution).
- return_obj_refs: Whether to return `ObjectRef` instead of actual results.
- Note, for fault tolerance reasons, these returned ObjectRefs should
- never be resolved with ray.get() outside of this `RunnerGroup`.
- mark_healthy: Whether to mark all those `Runner`s healthy again that are
- currently marked unhealthy AND that returned results from the remote
- call (within the given `timeout_seconds`).
- Note that `Runner`s are NOT set unhealthy, if they simply time out
- (only if they return a `RayActorError`).
- Also note that this setting is ignored if `healthy_only=True` (b/c
- `mark_healthy` only affects `Runner`s that are currently tagged as
- unhealthy).
- Returns:
- The list of return values of all calls to `func([worker])`.
- """
- assert (
- not return_obj_refs or not local_runner
- ), "Can not return `ObjectRef` from local worker."
- local_result = []
- if local_runner and self.local_runner is not None:
- if kwargs:
- local_kwargs = kwargs[0]
- kwargs = kwargs[1:]
- else:
- local_kwargs = {}
- kwargs = kwargs
- if isinstance(func, str):
- local_result = [getattr(self.local_runner, func)(**local_kwargs)]
- else:
- local_result = [func(self.local_runner, **local_kwargs)]
- if not self._worker_manager.actor_ids():
- return local_result
- remote_results = self._worker_manager.foreach_actor(
- func,
- kwargs=kwargs,
- healthy_only=healthy_only,
- remote_actor_ids=remote_worker_ids,
- timeout_seconds=timeout_seconds,
- return_obj_refs=return_obj_refs,
- mark_healthy=mark_healthy,
- )
- FaultTolerantActorManager.handle_remote_call_result_errors(
- remote_results, ignore_ray_errors=self._ignore_ray_errors_on_runners
- )
- # With application errors handled, return good results.
- remote_results = [r.get() for r in remote_results.ignore_errors()]
- return local_result + remote_results
- def foreach_runner_async(
- self,
- func: Union[Callable[[Runner], T], List[Callable[[Runner], T]], str, List[str]],
- *,
- healthy_only: bool = True,
- remote_worker_ids: List[int] = None,
- ) -> int:
- """Calls the given function asynchronously with each `Runner` as the argument.
- Does not return results directly. Instead, `fetch_ready_async_reqs()` can be
- used to pull results in an async manner whenever they are available.
- Args:
- func: The function to call for each `Runner`s. The only call argument is
- the respective `Runner` instance.
- healthy_only: Apply `func` on known-to-be healthy `Runner`s only.
- remote_worker_ids: Apply `func` on a selected set of remote `Runner`s.
- Returns:
- The number of async requests that have actually been made. This is the
- length of `remote_worker_ids` (or self.num_remote_workers()` if
- `remote_worker_ids` is None) minus the number of requests that were NOT
- made b/c a remote `Runner` already had its
- `max_remote_requests_in_flight_per_actor` counter reached.
- """
- return self._worker_manager.foreach_actor_async(
- func,
- healthy_only=healthy_only,
- remote_actor_ids=remote_worker_ids,
- )
- def fetch_ready_async_reqs(
- self,
- *,
- timeout_seconds: Optional[float] = 0.0,
- return_obj_refs: bool = False,
- mark_healthy: bool = False,
- ) -> List[Tuple[int, T]]:
- """Get esults from outstanding asynchronous requests that are ready.
- Args:
- timeout_seconds: Time to wait for results. Default is 0, meaning
- those requests that are already ready.
- return_obj_refs: Whether to return ObjectRef instead of actual results.
- mark_healthy: Whether to mark all those workers healthy again that are
- currently marked unhealthy AND that returned results from the remote
- call (within the given `timeout_seconds`).
- Note that workers are NOT set unhealthy, if they simply time out
- (only if they return a RayActorError).
- Also note that this setting is ignored if `healthy_only=True` (b/c
- `mark_healthy` only affects workers that are currently tagged as
- unhealthy).
- Returns:
- A list of results successfully returned from outstanding remote calls,
- paired with the indices of the callee workers.
- """
- remote_results = self._worker_manager.fetch_ready_async_reqs(
- timeout_seconds=timeout_seconds,
- return_obj_refs=return_obj_refs,
- mark_healthy=mark_healthy,
- )
- FaultTolerantActorManager.handle_remote_call_result_errors(
- remote_results,
- ignore_ray_errors=self._ignore_ray_errors_on_runners,
- )
- return [(r.actor_id, r.get()) for r in remote_results.ignore_errors()]
- def probe_unhealthy_runners(self) -> List[int]:
- """Checks for unhealthy workers and tries restoring their states.
- Returns:
- List of IDs of the workers that were restored.
- """
- return self._worker_manager.probe_unhealthy_actors(
- timeout_seconds=self.runner_health_probe_timeout_s,
- mark_healthy=True,
- )
- @property
- @abc.abstractmethod
- def runner_health_probe_timeout_s(self):
- """Number of seconds to wait for health probe calls to `Runner`s."""
- @property
- @abc.abstractmethod
- def runner_cls(self) -> Callable:
- """Class for each runner."""
- @property
- def _local_config(self) -> "AlgorithmConfig":
- """Returns the config for a local `Runner`."""
- return self.__local_config
- @property
- def local_runner(self) -> Runner:
- """Returns the local `Runner`."""
- return self._local_runner
- @property
- def healthy_runner_ids(self) -> List[int]:
- """Returns the list of remote `Runner` IDs."""
- return self._worker_manager.healthy_actor_ids()
- @property
- @abc.abstractmethod
- def num_runners(self) -> int:
- """Number of runners to schedule and manage."""
- @property
- def num_remote_runners(self) -> int:
- """Number of remote `Runner`s."""
- return self._worker_manager.num_actors()
- @property
- def num_healthy_remote_runners(self) -> int:
- """Returns the number of healthy remote `Runner`s."""
- return self._worker_manager.num_healthy_actors()
- @property
- def num_healthy_runners(self) -> int:
- """Returns the number of healthy `Runner`s."""
- return int(bool(self._local_runner)) + self.num_healthy_remote_runners()
- @property
- def num_in_flight_async_reqs(self) -> int:
- """Returns the number of in-flight async requests."""
- return self._worker_manager.num_outstanding_async_reqs()
- @property
- def num_remote_runner_restarts(self) -> int:
- """Returns the number of times managed remote `Runner`s have been restarted."""
- return self._worker_manager.total_num_restarts()
- @property
- @abc.abstractmethod
- def _remote_args(self):
- """Remote arguments for each runner."""
- @property
- @abc.abstractmethod
- def _ignore_ray_errors_on_runners(self):
- """If errors in runners should be ignored."""
- @property
- @abc.abstractmethod
- def _max_requests_in_flight_per_runner(self):
- """Maximum requests in flight per runner."""
- @property
- @abc.abstractmethod
- def _validate_runners_after_construction(self):
- """If runners should validated after constructed."""
|