| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- """Manage, parse and validate options for Ray tasks, actors and actor methods."""
- import warnings
- from dataclasses import dataclass
- from typing import Any, Callable, Dict, Optional, Tuple, Union
- import ray
- from ray._private import ray_constants
- from ray._private.label_utils import (
- validate_fallback_strategy,
- validate_label_selector,
- )
- from ray._private.utils import get_ray_doc_version
- from ray.util.placement_group import PlacementGroup
- from ray.util.scheduling_strategies import (
- NodeAffinitySchedulingStrategy,
- NodeLabelSchedulingStrategy,
- PlacementGroupSchedulingStrategy,
- )
- @dataclass
- class Option:
- # Type constraint of an option.
- type_constraint: Optional[Union[type, Tuple[type]]] = None
- # Value constraint of an option.
- # The callable should return None if there is no error.
- # Otherwise, return the error message.
- value_constraint: Optional[Callable[[Any], Optional[str]]] = None
- # Default value.
- default_value: Any = None
- def validate(self, keyword: str, value: Any):
- """Validate the option."""
- if self.type_constraint is not None:
- if not isinstance(value, self.type_constraint):
- raise TypeError(
- f"The type of keyword '{keyword}' must be {self.type_constraint}, "
- f"but received type {type(value)}"
- )
- if self.value_constraint is not None:
- possible_error_message = self.value_constraint(value)
- if possible_error_message:
- raise ValueError(possible_error_message)
- def _counting_option(name: str, infinite: bool = True, default_value: Any = None):
- """This is used for positive and discrete options.
- Args:
- name: The name of the option keyword.
- infinite: If True, user could use -1 to represent infinity.
- default_value: The default value for this option.
- Returns:
- An Option object.
- """
- if infinite:
- return Option(
- (int, type(None)),
- lambda x: None
- if (x is None or x >= -1)
- else f"The keyword '{name}' only accepts None, 0, -1"
- " or a positive integer, where -1 represents infinity.",
- default_value=default_value,
- )
- return Option(
- (int, type(None)),
- lambda x: None
- if (x is None or x >= 0)
- else f"The keyword '{name}' only accepts None, 0 or a positive integer.",
- default_value=default_value,
- )
- def _validate_resource_quantity(name, quantity):
- if quantity < 0:
- return f"The quantity of resource {name} cannot be negative"
- if (
- isinstance(quantity, float)
- and quantity != 0.0
- and int(quantity * ray._raylet.RESOURCE_UNIT_SCALING) == 0
- ):
- return (
- f"The precision of the fractional quantity of resource {name}"
- " cannot go beyond 0.0001"
- )
- resource_name = "GPU" if name == "num_gpus" else name
- if resource_name in ray._private.accelerators.get_all_accelerator_resource_names():
- (
- valid,
- error_message,
- ) = ray._private.accelerators.get_accelerator_manager_for_resource(
- resource_name
- ).validate_resource_request_quantity(
- quantity
- )
- if not valid:
- return error_message
- return None
- def _resource_option(name: str, default_value: Any = None):
- """This is used for resource related options."""
- return Option(
- (float, int, type(None)),
- lambda x: None if (x is None) else _validate_resource_quantity(name, x),
- default_value=default_value,
- )
- def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]:
- if resources is None:
- return None
- if "CPU" in resources or "GPU" in resources:
- return (
- "Use the 'num_cpus' and 'num_gpus' keyword instead of 'CPU' and 'GPU' "
- "in 'resources' keyword"
- )
- for name, quantity in resources.items():
- possible_error_message = _validate_resource_quantity(name, quantity)
- if possible_error_message:
- return possible_error_message
- return None
- _common_options = {
- "label_selector": Option((dict, type(None)), lambda x: validate_label_selector(x)),
- "fallback_strategy": Option(
- (list, type(None)), lambda x: validate_fallback_strategy(x)
- ),
- "accelerator_type": Option((str, type(None))),
- "memory": _resource_option("memory"),
- "name": Option((str, type(None))),
- "num_cpus": _resource_option("num_cpus"),
- "num_gpus": _resource_option("num_gpus"),
- "object_store_memory": _counting_option("object_store_memory", False),
- # TODO(suquark): "placement_group", "placement_group_bundle_index"
- # and "placement_group_capture_child_tasks" are deprecated,
- # use "scheduling_strategy" instead.
- "placement_group": Option(
- (type(None), str, PlacementGroup), default_value="default"
- ),
- "placement_group_bundle_index": Option(int, default_value=-1),
- "placement_group_capture_child_tasks": Option((bool, type(None))),
- "resources": Option((dict, type(None)), lambda x: _validate_resources(x)),
- "runtime_env": Option((dict, type(None))),
- "scheduling_strategy": Option(
- (
- type(None),
- str,
- PlacementGroupSchedulingStrategy,
- NodeAffinitySchedulingStrategy,
- NodeLabelSchedulingStrategy,
- )
- ),
- "enable_task_events": Option(bool, default_value=True),
- "_labels": Option((dict, type(None))),
- }
- def issubclass_safe(obj: Any, cls_: type) -> bool:
- try:
- return issubclass(obj, cls_)
- except TypeError:
- return False
- _task_only_options = {
- "max_calls": _counting_option("max_calls", False, default_value=0),
- # Normal tasks may be retried on failure this many times.
- # TODO(swang): Allow this to be set globally for an application.
- "max_retries": _counting_option(
- "max_retries", default_value=ray_constants.DEFAULT_TASK_MAX_RETRIES
- ),
- # override "_common_options"
- "num_cpus": _resource_option("num_cpus", default_value=1),
- "num_returns": Option(
- (int, str, type(None)),
- lambda x: None
- if (x is None or x == "dynamic" or x == "streaming" or x >= 0)
- else "Default None. When None is passed, "
- "The default value is 1 for a task and actor task, and "
- "'streaming' for generator tasks and generator actor tasks. "
- "The keyword 'num_returns' only accepts None, "
- "a non-negative integer, "
- "'streaming' (for generators), or 'dynamic'. 'dynamic' flag "
- "will be deprecated in the future, and it is recommended to use "
- "'streaming' instead.",
- default_value=None,
- ),
- "object_store_memory": Option( # override "_common_options"
- (int, type(None)),
- lambda x: None
- if (x is None)
- else "Setting 'object_store_memory' is not implemented for tasks",
- ),
- "retry_exceptions": Option(
- (bool, list, tuple),
- lambda x: None
- if (
- isinstance(x, bool)
- or (
- isinstance(x, (list, tuple))
- and all(issubclass_safe(x_, Exception) for x_ in x)
- )
- )
- else "retry_exceptions must be either a boolean or a list of exceptions",
- default_value=False,
- ),
- "_generator_backpressure_num_objects": Option(
- (int, type(None)),
- lambda x: None
- if x != 0
- else (
- "_generator_backpressure_num_objects=0 is not allowed. "
- "Use a value > 0. If the value is equal to 1, the behavior "
- "is identical to Python generator (generator 1 object "
- "whenever `next` is called). Use -1 to disable this feature. "
- ),
- ),
- }
- _actor_only_options = {
- "concurrency_groups": Option((list, dict, type(None))),
- "enable_tensor_transport": Option((bool, type(None)), default_value=None),
- "lifetime": Option(
- (str, type(None)),
- lambda x: None
- if x in (None, "detached", "non_detached")
- else "actor `lifetime` argument must be one of 'detached', "
- "'non_detached' and 'None'.",
- ),
- "max_concurrency": _counting_option("max_concurrency", False),
- "max_restarts": _counting_option("max_restarts", default_value=0),
- "max_task_retries": _counting_option("max_task_retries", default_value=0),
- "max_pending_calls": _counting_option("max_pending_calls", default_value=-1),
- "namespace": Option((str, type(None))),
- "get_if_exists": Option(bool, default_value=False),
- "allow_out_of_order_execution": Option((bool, type(None))),
- }
- # Priority is important here because during dictionary update, same key with higher
- # priority overrides the same key with lower priority. We make use of priority
- # to set the correct default value for tasks / actors.
- # priority: _common_options > _actor_only_options > _task_only_options
- valid_options: Dict[str, Option] = {
- **_task_only_options,
- **_actor_only_options,
- **_common_options,
- }
- # priority: _task_only_options > _common_options
- task_options: Dict[str, Option] = {**_common_options, **_task_only_options}
- # priority: _actor_only_options > _common_options
- actor_options: Dict[str, Option] = {**_common_options, **_actor_only_options}
- remote_args_error_string = (
- "The @ray.remote decorator must be applied either with no arguments and no "
- "parentheses, for example '@ray.remote', or it must be applied using some of "
- f"the arguments in the list {list(valid_options.keys())}, for example "
- "'@ray.remote(num_returns=2, resources={\"CustomResource\": 1})'."
- )
- def _check_deprecate_placement_group(options: Dict[str, Any]):
- """Check if deprecated placement group option exists."""
- placement_group = options.get("placement_group", "default")
- scheduling_strategy = options.get("scheduling_strategy")
- # TODO(suquark): @ray.remote(placement_group=None) is used in
- # "python/ray.data._internal/remote_fn.py" and many other places,
- # while "ray.data.read_api.read_datasource" set "scheduling_strategy=SPREAD".
- # This might be a bug, but it is also ok to allow them co-exist.
- if (placement_group not in ("default", None)) and (scheduling_strategy is not None):
- raise ValueError(
- "Placement groups should be specified via the "
- "scheduling_strategy option. "
- "The placement_group option is deprecated."
- )
- def _warn_if_using_deprecated_placement_group(
- options: Dict[str, Any], caller_stacklevel: int
- ):
- placement_group = options["placement_group"]
- placement_group_bundle_index = options["placement_group_bundle_index"]
- placement_group_capture_child_tasks = options["placement_group_capture_child_tasks"]
- if placement_group != "default":
- warnings.warn(
- "placement_group parameter is deprecated. Use "
- "scheduling_strategy=PlacementGroupSchedulingStrategy(...) "
- "instead, see the usage at "
- f"https://docs.ray.io/en/{get_ray_doc_version()}/ray-core/package-ref.html#ray-remote.", # noqa: E501
- DeprecationWarning,
- stacklevel=caller_stacklevel + 1,
- )
- if placement_group_bundle_index != -1:
- warnings.warn(
- "placement_group_bundle_index parameter is deprecated. Use "
- "scheduling_strategy=PlacementGroupSchedulingStrategy(...) "
- "instead, see the usage at "
- f"https://docs.ray.io/en/{get_ray_doc_version()}/ray-core/package-ref.html#ray-remote.", # noqa: E501
- DeprecationWarning,
- stacklevel=caller_stacklevel + 1,
- )
- if placement_group_capture_child_tasks:
- warnings.warn(
- "placement_group_capture_child_tasks parameter is deprecated. Use "
- "scheduling_strategy=PlacementGroupSchedulingStrategy(...) "
- "instead, see the usage at "
- f"https://docs.ray.io/en/{get_ray_doc_version()}/ray-core/package-ref.html#ray-remote.", # noqa: E501
- DeprecationWarning,
- stacklevel=caller_stacklevel + 1,
- )
- def validate_task_options(options: Dict[str, Any], in_options: bool):
- """Options check for Ray tasks.
- Args:
- options: Options for Ray tasks.
- in_options: If True, we are checking the options under the context of
- ".options()".
- """
- for k, v in options.items():
- if k not in task_options:
- raise ValueError(
- f"Invalid option keyword {k} for remote functions. "
- f"Valid ones are {list(task_options)}."
- )
- task_options[k].validate(k, v)
- if in_options and "max_calls" in options:
- raise ValueError("Setting 'max_calls' is not supported in '.options()'.")
- _check_deprecate_placement_group(options)
- def validate_actor_options(options: Dict[str, Any], in_options: bool):
- """Options check for Ray actors.
- Args:
- options: Options for Ray actors.
- in_options: If True, we are checking the options under the context of
- ".options()".
- """
- for k, v in options.items():
- if k not in actor_options:
- raise ValueError(
- f"Invalid option keyword {k} for actors. "
- f"Valid ones are {list(actor_options)}."
- )
- actor_options[k].validate(k, v)
- if in_options and "concurrency_groups" in options:
- raise ValueError(
- "Setting 'concurrency_groups' is not supported in '.options()'."
- )
- if options.get("get_if_exists") and not options.get("name"):
- raise ValueError("The actor name must be specified to use `get_if_exists`.")
- if "object_store_memory" in options:
- warnings.warn(
- "Setting 'object_store_memory'"
- " for actors is deprecated since it doesn't actually"
- " reserve the required object store memory."
- f" Use object spilling that's enabled by default (https://docs.ray.io/en/{get_ray_doc_version()}/ray-core/objects/object-spilling.html) " # noqa: E501
- "instead to bypass the object store memory size limitation.",
- DeprecationWarning,
- stacklevel=1,
- )
- _check_deprecate_placement_group(options)
- def update_options(
- original_options: Dict[str, Any], new_options: Dict[str, Any]
- ) -> Dict[str, Any]:
- """Update original options with new options and return.
- The returned updated options contain shallow copy of original options.
- """
- return {**original_options, **new_options}
|