ray_option_utils.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. """Manage, parse and validate options for Ray tasks, actors and actor methods."""
  2. import warnings
  3. from dataclasses import dataclass
  4. from typing import Any, Callable, Dict, Optional, Tuple, Union
  5. import ray
  6. from ray._private import ray_constants
  7. from ray._private.label_utils import (
  8. validate_fallback_strategy,
  9. validate_label_selector,
  10. )
  11. from ray._private.utils import get_ray_doc_version
  12. from ray.util.placement_group import PlacementGroup
  13. from ray.util.scheduling_strategies import (
  14. NodeAffinitySchedulingStrategy,
  15. NodeLabelSchedulingStrategy,
  16. PlacementGroupSchedulingStrategy,
  17. )
  18. @dataclass
  19. class Option:
  20. # Type constraint of an option.
  21. type_constraint: Optional[Union[type, Tuple[type]]] = None
  22. # Value constraint of an option.
  23. # The callable should return None if there is no error.
  24. # Otherwise, return the error message.
  25. value_constraint: Optional[Callable[[Any], Optional[str]]] = None
  26. # Default value.
  27. default_value: Any = None
  28. def validate(self, keyword: str, value: Any):
  29. """Validate the option."""
  30. if self.type_constraint is not None:
  31. if not isinstance(value, self.type_constraint):
  32. raise TypeError(
  33. f"The type of keyword '{keyword}' must be {self.type_constraint}, "
  34. f"but received type {type(value)}"
  35. )
  36. if self.value_constraint is not None:
  37. possible_error_message = self.value_constraint(value)
  38. if possible_error_message:
  39. raise ValueError(possible_error_message)
  40. def _counting_option(name: str, infinite: bool = True, default_value: Any = None):
  41. """This is used for positive and discrete options.
  42. Args:
  43. name: The name of the option keyword.
  44. infinite: If True, user could use -1 to represent infinity.
  45. default_value: The default value for this option.
  46. Returns:
  47. An Option object.
  48. """
  49. if infinite:
  50. return Option(
  51. (int, type(None)),
  52. lambda x: None
  53. if (x is None or x >= -1)
  54. else f"The keyword '{name}' only accepts None, 0, -1"
  55. " or a positive integer, where -1 represents infinity.",
  56. default_value=default_value,
  57. )
  58. return Option(
  59. (int, type(None)),
  60. lambda x: None
  61. if (x is None or x >= 0)
  62. else f"The keyword '{name}' only accepts None, 0 or a positive integer.",
  63. default_value=default_value,
  64. )
  65. def _validate_resource_quantity(name, quantity):
  66. if quantity < 0:
  67. return f"The quantity of resource {name} cannot be negative"
  68. if (
  69. isinstance(quantity, float)
  70. and quantity != 0.0
  71. and int(quantity * ray._raylet.RESOURCE_UNIT_SCALING) == 0
  72. ):
  73. return (
  74. f"The precision of the fractional quantity of resource {name}"
  75. " cannot go beyond 0.0001"
  76. )
  77. resource_name = "GPU" if name == "num_gpus" else name
  78. if resource_name in ray._private.accelerators.get_all_accelerator_resource_names():
  79. (
  80. valid,
  81. error_message,
  82. ) = ray._private.accelerators.get_accelerator_manager_for_resource(
  83. resource_name
  84. ).validate_resource_request_quantity(
  85. quantity
  86. )
  87. if not valid:
  88. return error_message
  89. return None
  90. def _resource_option(name: str, default_value: Any = None):
  91. """This is used for resource related options."""
  92. return Option(
  93. (float, int, type(None)),
  94. lambda x: None if (x is None) else _validate_resource_quantity(name, x),
  95. default_value=default_value,
  96. )
  97. def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]:
  98. if resources is None:
  99. return None
  100. if "CPU" in resources or "GPU" in resources:
  101. return (
  102. "Use the 'num_cpus' and 'num_gpus' keyword instead of 'CPU' and 'GPU' "
  103. "in 'resources' keyword"
  104. )
  105. for name, quantity in resources.items():
  106. possible_error_message = _validate_resource_quantity(name, quantity)
  107. if possible_error_message:
  108. return possible_error_message
  109. return None
  110. _common_options = {
  111. "label_selector": Option((dict, type(None)), lambda x: validate_label_selector(x)),
  112. "fallback_strategy": Option(
  113. (list, type(None)), lambda x: validate_fallback_strategy(x)
  114. ),
  115. "accelerator_type": Option((str, type(None))),
  116. "memory": _resource_option("memory"),
  117. "name": Option((str, type(None))),
  118. "num_cpus": _resource_option("num_cpus"),
  119. "num_gpus": _resource_option("num_gpus"),
  120. "object_store_memory": _counting_option("object_store_memory", False),
  121. # TODO(suquark): "placement_group", "placement_group_bundle_index"
  122. # and "placement_group_capture_child_tasks" are deprecated,
  123. # use "scheduling_strategy" instead.
  124. "placement_group": Option(
  125. (type(None), str, PlacementGroup), default_value="default"
  126. ),
  127. "placement_group_bundle_index": Option(int, default_value=-1),
  128. "placement_group_capture_child_tasks": Option((bool, type(None))),
  129. "resources": Option((dict, type(None)), lambda x: _validate_resources(x)),
  130. "runtime_env": Option((dict, type(None))),
  131. "scheduling_strategy": Option(
  132. (
  133. type(None),
  134. str,
  135. PlacementGroupSchedulingStrategy,
  136. NodeAffinitySchedulingStrategy,
  137. NodeLabelSchedulingStrategy,
  138. )
  139. ),
  140. "enable_task_events": Option(bool, default_value=True),
  141. "_labels": Option((dict, type(None))),
  142. }
  143. def issubclass_safe(obj: Any, cls_: type) -> bool:
  144. try:
  145. return issubclass(obj, cls_)
  146. except TypeError:
  147. return False
  148. _task_only_options = {
  149. "max_calls": _counting_option("max_calls", False, default_value=0),
  150. # Normal tasks may be retried on failure this many times.
  151. # TODO(swang): Allow this to be set globally for an application.
  152. "max_retries": _counting_option(
  153. "max_retries", default_value=ray_constants.DEFAULT_TASK_MAX_RETRIES
  154. ),
  155. # override "_common_options"
  156. "num_cpus": _resource_option("num_cpus", default_value=1),
  157. "num_returns": Option(
  158. (int, str, type(None)),
  159. lambda x: None
  160. if (x is None or x == "dynamic" or x == "streaming" or x >= 0)
  161. else "Default None. When None is passed, "
  162. "The default value is 1 for a task and actor task, and "
  163. "'streaming' for generator tasks and generator actor tasks. "
  164. "The keyword 'num_returns' only accepts None, "
  165. "a non-negative integer, "
  166. "'streaming' (for generators), or 'dynamic'. 'dynamic' flag "
  167. "will be deprecated in the future, and it is recommended to use "
  168. "'streaming' instead.",
  169. default_value=None,
  170. ),
  171. "object_store_memory": Option( # override "_common_options"
  172. (int, type(None)),
  173. lambda x: None
  174. if (x is None)
  175. else "Setting 'object_store_memory' is not implemented for tasks",
  176. ),
  177. "retry_exceptions": Option(
  178. (bool, list, tuple),
  179. lambda x: None
  180. if (
  181. isinstance(x, bool)
  182. or (
  183. isinstance(x, (list, tuple))
  184. and all(issubclass_safe(x_, Exception) for x_ in x)
  185. )
  186. )
  187. else "retry_exceptions must be either a boolean or a list of exceptions",
  188. default_value=False,
  189. ),
  190. "_generator_backpressure_num_objects": Option(
  191. (int, type(None)),
  192. lambda x: None
  193. if x != 0
  194. else (
  195. "_generator_backpressure_num_objects=0 is not allowed. "
  196. "Use a value > 0. If the value is equal to 1, the behavior "
  197. "is identical to Python generator (generator 1 object "
  198. "whenever `next` is called). Use -1 to disable this feature. "
  199. ),
  200. ),
  201. }
  202. _actor_only_options = {
  203. "concurrency_groups": Option((list, dict, type(None))),
  204. "enable_tensor_transport": Option((bool, type(None)), default_value=None),
  205. "lifetime": Option(
  206. (str, type(None)),
  207. lambda x: None
  208. if x in (None, "detached", "non_detached")
  209. else "actor `lifetime` argument must be one of 'detached', "
  210. "'non_detached' and 'None'.",
  211. ),
  212. "max_concurrency": _counting_option("max_concurrency", False),
  213. "max_restarts": _counting_option("max_restarts", default_value=0),
  214. "max_task_retries": _counting_option("max_task_retries", default_value=0),
  215. "max_pending_calls": _counting_option("max_pending_calls", default_value=-1),
  216. "namespace": Option((str, type(None))),
  217. "get_if_exists": Option(bool, default_value=False),
  218. "allow_out_of_order_execution": Option((bool, type(None))),
  219. }
  220. # Priority is important here because during dictionary update, same key with higher
  221. # priority overrides the same key with lower priority. We make use of priority
  222. # to set the correct default value for tasks / actors.
  223. # priority: _common_options > _actor_only_options > _task_only_options
  224. valid_options: Dict[str, Option] = {
  225. **_task_only_options,
  226. **_actor_only_options,
  227. **_common_options,
  228. }
  229. # priority: _task_only_options > _common_options
  230. task_options: Dict[str, Option] = {**_common_options, **_task_only_options}
  231. # priority: _actor_only_options > _common_options
  232. actor_options: Dict[str, Option] = {**_common_options, **_actor_only_options}
  233. remote_args_error_string = (
  234. "The @ray.remote decorator must be applied either with no arguments and no "
  235. "parentheses, for example '@ray.remote', or it must be applied using some of "
  236. f"the arguments in the list {list(valid_options.keys())}, for example "
  237. "'@ray.remote(num_returns=2, resources={\"CustomResource\": 1})'."
  238. )
  239. def _check_deprecate_placement_group(options: Dict[str, Any]):
  240. """Check if deprecated placement group option exists."""
  241. placement_group = options.get("placement_group", "default")
  242. scheduling_strategy = options.get("scheduling_strategy")
  243. # TODO(suquark): @ray.remote(placement_group=None) is used in
  244. # "python/ray.data._internal/remote_fn.py" and many other places,
  245. # while "ray.data.read_api.read_datasource" set "scheduling_strategy=SPREAD".
  246. # This might be a bug, but it is also ok to allow them co-exist.
  247. if (placement_group not in ("default", None)) and (scheduling_strategy is not None):
  248. raise ValueError(
  249. "Placement groups should be specified via the "
  250. "scheduling_strategy option. "
  251. "The placement_group option is deprecated."
  252. )
  253. def _warn_if_using_deprecated_placement_group(
  254. options: Dict[str, Any], caller_stacklevel: int
  255. ):
  256. placement_group = options["placement_group"]
  257. placement_group_bundle_index = options["placement_group_bundle_index"]
  258. placement_group_capture_child_tasks = options["placement_group_capture_child_tasks"]
  259. if placement_group != "default":
  260. warnings.warn(
  261. "placement_group parameter is deprecated. Use "
  262. "scheduling_strategy=PlacementGroupSchedulingStrategy(...) "
  263. "instead, see the usage at "
  264. f"https://docs.ray.io/en/{get_ray_doc_version()}/ray-core/package-ref.html#ray-remote.", # noqa: E501
  265. DeprecationWarning,
  266. stacklevel=caller_stacklevel + 1,
  267. )
  268. if placement_group_bundle_index != -1:
  269. warnings.warn(
  270. "placement_group_bundle_index parameter is deprecated. Use "
  271. "scheduling_strategy=PlacementGroupSchedulingStrategy(...) "
  272. "instead, see the usage at "
  273. f"https://docs.ray.io/en/{get_ray_doc_version()}/ray-core/package-ref.html#ray-remote.", # noqa: E501
  274. DeprecationWarning,
  275. stacklevel=caller_stacklevel + 1,
  276. )
  277. if placement_group_capture_child_tasks:
  278. warnings.warn(
  279. "placement_group_capture_child_tasks parameter is deprecated. Use "
  280. "scheduling_strategy=PlacementGroupSchedulingStrategy(...) "
  281. "instead, see the usage at "
  282. f"https://docs.ray.io/en/{get_ray_doc_version()}/ray-core/package-ref.html#ray-remote.", # noqa: E501
  283. DeprecationWarning,
  284. stacklevel=caller_stacklevel + 1,
  285. )
  286. def validate_task_options(options: Dict[str, Any], in_options: bool):
  287. """Options check for Ray tasks.
  288. Args:
  289. options: Options for Ray tasks.
  290. in_options: If True, we are checking the options under the context of
  291. ".options()".
  292. """
  293. for k, v in options.items():
  294. if k not in task_options:
  295. raise ValueError(
  296. f"Invalid option keyword {k} for remote functions. "
  297. f"Valid ones are {list(task_options)}."
  298. )
  299. task_options[k].validate(k, v)
  300. if in_options and "max_calls" in options:
  301. raise ValueError("Setting 'max_calls' is not supported in '.options()'.")
  302. _check_deprecate_placement_group(options)
  303. def validate_actor_options(options: Dict[str, Any], in_options: bool):
  304. """Options check for Ray actors.
  305. Args:
  306. options: Options for Ray actors.
  307. in_options: If True, we are checking the options under the context of
  308. ".options()".
  309. """
  310. for k, v in options.items():
  311. if k not in actor_options:
  312. raise ValueError(
  313. f"Invalid option keyword {k} for actors. "
  314. f"Valid ones are {list(actor_options)}."
  315. )
  316. actor_options[k].validate(k, v)
  317. if in_options and "concurrency_groups" in options:
  318. raise ValueError(
  319. "Setting 'concurrency_groups' is not supported in '.options()'."
  320. )
  321. if options.get("get_if_exists") and not options.get("name"):
  322. raise ValueError("The actor name must be specified to use `get_if_exists`.")
  323. if "object_store_memory" in options:
  324. warnings.warn(
  325. "Setting 'object_store_memory'"
  326. " for actors is deprecated since it doesn't actually"
  327. " reserve the required object store memory."
  328. f" Use object spilling that's enabled by default (https://docs.ray.io/en/{get_ray_doc_version()}/ray-core/objects/object-spilling.html) " # noqa: E501
  329. "instead to bypass the object store memory size limitation.",
  330. DeprecationWarning,
  331. stacklevel=1,
  332. )
  333. _check_deprecate_placement_group(options)
  334. def update_options(
  335. original_options: Dict[str, Any], new_options: Dict[str, Any]
  336. ) -> Dict[str, Any]:
  337. """Update original options with new options and return.
  338. The returned updated options contain shallow copy of original options.
  339. """
  340. return {**original_options, **new_options}