options.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. from typing import Any, Dict, Optional
  2. from ray._common import ray_option_utils
  3. from ray.util.placement_group import PlacementGroup, check_placement_group_index
  4. from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
  5. def validate_options(kwargs_dict: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
  6. if kwargs_dict is None:
  7. return None
  8. if len(kwargs_dict) == 0:
  9. return None
  10. out = {}
  11. for k, v in kwargs_dict.items():
  12. if k not in ray_option_utils.valid_options:
  13. raise ValueError(
  14. f"Invalid option keyword: '{k}'. "
  15. f"{ray_option_utils.remote_args_error_string}"
  16. )
  17. ray_option_utils.valid_options[k].validate(k, v)
  18. out[k] = v
  19. # Validate placement setting similar to the logic in ray/actor.py and
  20. # ray/remote_function.py. The difference is that when
  21. # placement_group = default and placement_group_capture_child_tasks
  22. # specified, placement group cannot be resolved at client. So this check
  23. # skips this case and relies on server to enforce any condition.
  24. bundle_index = out.get("placement_group_bundle_index", None)
  25. pg = out.get("placement_group", None)
  26. scheduling_strategy = out.get("scheduling_strategy", None)
  27. if isinstance(scheduling_strategy, PlacementGroupSchedulingStrategy):
  28. pg = scheduling_strategy.placement_group
  29. bundle_index = scheduling_strategy.placement_group_bundle_index
  30. if bundle_index is not None:
  31. if pg is None:
  32. pg = PlacementGroup.empty()
  33. if pg == "default" and (
  34. out.get("placement_group_capture_child_tasks", None) is None
  35. ):
  36. pg = PlacementGroup.empty()
  37. if isinstance(pg, PlacementGroup):
  38. check_placement_group_index(pg, bundle_index)
  39. return out