placement_group.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. import warnings
  2. from typing import Dict, List, Optional, Union
  3. import ray
  4. from ray._common.utils import PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME, hex_to_binary
  5. from ray._private.auto_init_hook import auto_init_ray
  6. from ray._private.client_mode_hook import client_mode_should_convert, client_mode_wrap
  7. from ray._private.label_utils import validate_label_selector
  8. from ray._private.utils import get_ray_doc_version
  9. from ray._raylet import PlacementGroupID
  10. from ray.util.annotations import DeveloperAPI, PublicAPI
  11. from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
  12. bundle_reservation_check = None
  13. VALID_PLACEMENT_GROUP_STRATEGIES = {
  14. "PACK",
  15. "SPREAD",
  16. "STRICT_PACK",
  17. "STRICT_SPREAD",
  18. }
  19. # We need to import this method to use for ready API.
  20. # But ray.remote is only available in runtime, and
  21. # if we define this method inside ready method, this function is
  22. # exported whenever ready is called, which can impact performance,
  23. # https://github.com/ray-project/ray/issues/6240.
  24. def _export_bundle_reservation_check_method_if_needed():
  25. global bundle_reservation_check
  26. if bundle_reservation_check:
  27. return
  28. @ray.remote(num_cpus=0)
  29. def bundle_reservation_check_func(placement_group):
  30. return placement_group
  31. bundle_reservation_check = bundle_reservation_check_func
  32. @PublicAPI
  33. class PlacementGroup:
  34. """A handle to a placement group."""
  35. @staticmethod
  36. def empty() -> "PlacementGroup":
  37. return PlacementGroup(PlacementGroupID.nil())
  38. def __init__(
  39. self,
  40. id: "ray._raylet.PlacementGroupID",
  41. bundle_cache: Optional[List[Dict]] = None,
  42. ):
  43. self.id = id
  44. self.bundle_cache = bundle_cache
  45. @property
  46. def is_empty(self):
  47. return self.id.is_nil()
  48. def ready(self) -> "ray._raylet.ObjectRef":
  49. """Returns an ObjectRef to check ready status.
  50. This API runs a small dummy task to wait for placement group creation.
  51. It is compatible to ray.get and ray.wait.
  52. Example:
  53. .. testcode::
  54. import ray
  55. pg = ray.util.placement_group([{"CPU": 1}])
  56. ray.get(pg.ready())
  57. pg = ray.util.placement_group([{"CPU": 1}])
  58. ray.wait([pg.ready()])
  59. """
  60. self._fill_bundle_cache_if_needed()
  61. _export_bundle_reservation_check_method_if_needed()
  62. assert len(self.bundle_cache) != 0, (
  63. "ready() cannot be called on placement group object with a "
  64. "bundle length == 0, current bundle length: "
  65. f"{len(self.bundle_cache)}"
  66. )
  67. return bundle_reservation_check.options(
  68. scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=self),
  69. ).remote(self)
  70. def wait(self, timeout_seconds: Union[float, int] = 30) -> bool:
  71. """Wait for the placement group to be ready within the specified time.
  72. Args:
  73. timeout_seconds(float|int): Timeout in seconds.
  74. Return:
  75. True if the placement group is created. False otherwise.
  76. """
  77. return _call_placement_group_ready(self.id, timeout_seconds)
  78. @property
  79. def bundle_specs(self) -> List[Dict]:
  80. """List[Dict]: Return bundles belonging to this placement group."""
  81. self._fill_bundle_cache_if_needed()
  82. return self.bundle_cache
  83. @property
  84. def bundle_count(self) -> int:
  85. self._fill_bundle_cache_if_needed()
  86. return len(self.bundle_cache)
  87. def _fill_bundle_cache_if_needed(self) -> None:
  88. if not self.bundle_cache:
  89. self.bundle_cache = _get_bundle_cache(self.id)
  90. def __eq__(self, other):
  91. if not isinstance(other, PlacementGroup):
  92. return False
  93. return self.id == other.id
  94. def __hash__(self):
  95. return hash(self.id)
  96. @client_mode_wrap
  97. def _call_placement_group_ready(pg_id: PlacementGroupID, timeout_seconds: int) -> bool:
  98. worker = ray._private.worker.global_worker
  99. worker.check_connected()
  100. return worker.core_worker.wait_placement_group_ready(pg_id, timeout_seconds)
  101. @client_mode_wrap
  102. def _get_bundle_cache(pg_id: PlacementGroupID) -> List[Dict]:
  103. worker = ray._private.worker.global_worker
  104. worker.check_connected()
  105. return list(
  106. ray._private.state.state.placement_group_table(pg_id)["bundles"].values()
  107. )
  108. @PublicAPI
  109. @client_mode_wrap
  110. def placement_group(
  111. bundles: List[Dict[str, float]],
  112. strategy: str = "PACK",
  113. name: str = "",
  114. lifetime: Optional[str] = None,
  115. _soft_target_node_id: Optional[str] = None,
  116. bundle_label_selector: List[Dict[str, str]] = None,
  117. ) -> PlacementGroup:
  118. """Asynchronously creates a PlacementGroup.
  119. Args:
  120. bundles: A list of bundles which
  121. represent the resources requirements.
  122. strategy: The strategy to create the placement group.
  123. - "PACK": Packs Bundles into as few nodes as possible.
  124. - "SPREAD": Places Bundles across distinct nodes as even as possible.
  125. - "STRICT_PACK": Packs Bundles into one node. The group is
  126. not allowed to span multiple nodes.
  127. - "STRICT_SPREAD": Packs Bundles across distinct nodes.
  128. name: The name of the placement group.
  129. lifetime: Either `None`, which defaults to the placement group
  130. will fate share with its creator and will be deleted once its
  131. creator is dead, or "detached", which means the placement group
  132. will live as a global object independent of the creator.
  133. _soft_target_node_id: (Private, Experimental) Soft hint where bundles of
  134. this placement group should be placed.
  135. The target node is specified by it's hex ID.
  136. If the target node has no available resources or died,
  137. bundles can be placed elsewhere.
  138. This currently only works with STRICT_PACK pg.
  139. bundle_label_selector: A list of label selectors to apply to a
  140. placement group on a per-bundle level.
  141. Raises:
  142. ValueError: if bundle type is not a list.
  143. ValueError: if empty bundle or empty resource bundles are given.
  144. ValueError: if the wrong lifetime arguments are given.
  145. Return:
  146. PlacementGroup: Placement group object.
  147. """
  148. worker = ray._private.worker.global_worker
  149. worker.check_connected()
  150. validate_placement_group(
  151. bundles=bundles,
  152. strategy=strategy,
  153. lifetime=lifetime,
  154. _soft_target_node_id=_soft_target_node_id,
  155. bundle_label_selector=bundle_label_selector,
  156. )
  157. if bundle_label_selector is None:
  158. bundle_label_selector = []
  159. if lifetime == "detached":
  160. detached = True
  161. else:
  162. detached = False
  163. placement_group_id = worker.core_worker.create_placement_group(
  164. name,
  165. bundles,
  166. strategy,
  167. detached,
  168. _soft_target_node_id,
  169. bundle_label_selector,
  170. )
  171. return PlacementGroup(placement_group_id)
  172. @PublicAPI
  173. @client_mode_wrap
  174. def remove_placement_group(placement_group: PlacementGroup) -> None:
  175. """Asynchronously remove placement group.
  176. Args:
  177. placement_group: The placement group to delete.
  178. """
  179. assert placement_group is not None
  180. worker = ray._private.worker.global_worker
  181. worker.check_connected()
  182. worker.core_worker.remove_placement_group(placement_group.id)
  183. @PublicAPI
  184. @client_mode_wrap
  185. def get_placement_group(placement_group_name: str) -> PlacementGroup:
  186. """Get a placement group object with a global name.
  187. Returns:
  188. None if can't find a placement group with the given name.
  189. The placement group object otherwise.
  190. """
  191. if not placement_group_name:
  192. raise ValueError("Please supply a non-empty value to get_placement_group")
  193. worker = ray._private.worker.global_worker
  194. worker.check_connected()
  195. placement_group_info = ray._private.state.state.get_placement_group_by_name(
  196. placement_group_name, worker.namespace
  197. )
  198. if placement_group_info is None:
  199. raise ValueError(
  200. f"Failed to look up placement group with name: {placement_group_name}"
  201. )
  202. else:
  203. return PlacementGroup(
  204. PlacementGroupID(hex_to_binary(placement_group_info["placement_group_id"]))
  205. )
  206. @DeveloperAPI
  207. @client_mode_wrap
  208. def placement_group_table(placement_group: PlacementGroup = None) -> dict:
  209. """Get the state of the placement group from GCS.
  210. Args:
  211. placement_group: placement group to see
  212. states.
  213. """
  214. worker = ray._private.worker.global_worker
  215. worker.check_connected()
  216. placement_group_id = placement_group.id if (placement_group is not None) else None
  217. return ray._private.state.state.placement_group_table(placement_group_id)
  218. @PublicAPI
  219. def get_current_placement_group() -> Optional[PlacementGroup]:
  220. """Get the current placement group which a task or actor is using.
  221. It returns None if there's no current placement group for the worker.
  222. For example, if you call this method in your driver, it returns None
  223. (because drivers never belong to any placement group).
  224. Examples:
  225. .. testcode::
  226. import ray
  227. from ray.util.placement_group import get_current_placement_group
  228. from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
  229. @ray.remote
  230. def f():
  231. # This returns the placement group the task f belongs to.
  232. # It means this pg is identical to the pg created below.
  233. return get_current_placement_group()
  234. pg = ray.util.placement_group([{"CPU": 2}])
  235. assert ray.get(f.options(
  236. scheduling_strategy=PlacementGroupSchedulingStrategy(
  237. placement_group=pg)).remote()) == pg
  238. # Driver doesn't belong to any placement group,
  239. # so it returns None.
  240. assert get_current_placement_group() is None
  241. Return:
  242. PlacementGroup: Placement group object.
  243. None if the current task or actor wasn't
  244. created with any placement group.
  245. """
  246. auto_init_ray()
  247. if client_mode_should_convert():
  248. # Client mode is only a driver.
  249. return None
  250. worker = ray._private.worker.global_worker
  251. worker.check_connected()
  252. pg_id = worker.placement_group_id
  253. if pg_id.is_nil():
  254. return None
  255. return PlacementGroup(pg_id)
  256. def check_placement_group_index(
  257. placement_group: PlacementGroup, bundle_index: int
  258. ) -> None:
  259. assert placement_group is not None
  260. if placement_group.id.is_nil():
  261. if bundle_index != -1:
  262. raise ValueError(
  263. "If placement group is not set, "
  264. "the value of bundle index must be -1."
  265. )
  266. elif bundle_index >= placement_group.bundle_count or bundle_index < -1:
  267. raise ValueError(
  268. f"placement group bundle index {bundle_index} "
  269. f"is invalid. Valid placement group indexes: "
  270. f"0-{placement_group.bundle_count}"
  271. )
  272. def validate_placement_group(
  273. bundles: List[Dict[str, float]],
  274. strategy: str = "PACK",
  275. lifetime: Optional[str] = None,
  276. _soft_target_node_id: Optional[str] = None,
  277. bundle_label_selector: List[Dict[str, str]] = None,
  278. ) -> bool:
  279. """Validates inputs for placement_group.
  280. Raises ValueError if inputs are invalid.
  281. """
  282. if _soft_target_node_id and strategy != "STRICT_PACK":
  283. raise ValueError(
  284. "_soft_target_node_id currently only works "
  285. f"with STRICT_PACK but got {strategy}"
  286. )
  287. if _soft_target_node_id and ray.NodeID.from_hex(_soft_target_node_id).is_nil():
  288. raise ValueError(
  289. f"Invalid hex ID of _soft_target_node_id, got {_soft_target_node_id}"
  290. )
  291. _validate_bundles(bundles)
  292. if bundle_label_selector is not None:
  293. if len(bundles) != len(bundle_label_selector):
  294. raise ValueError(
  295. f"Invalid bundle label selector {bundle_label_selector}. "
  296. f"The length of `bundle_label_selector` should equal the length of `bundles`."
  297. )
  298. _validate_bundle_label_selector(bundle_label_selector)
  299. if strategy not in VALID_PLACEMENT_GROUP_STRATEGIES:
  300. raise ValueError(
  301. f"Invalid placement group strategy {strategy}. "
  302. f"Supported strategies are: {VALID_PLACEMENT_GROUP_STRATEGIES}."
  303. )
  304. if lifetime not in [None, "detached"]:
  305. raise ValueError(
  306. "Placement group `lifetime` argument must be either `None` or "
  307. f"'detached'. Got {lifetime}."
  308. )
  309. def _validate_bundles(bundles: List[Dict[str, float]]):
  310. """Validates each bundle and raises a ValueError if any bundle is invalid."""
  311. if not isinstance(bundles, list):
  312. raise ValueError(
  313. "Placement group bundles must be a list, " f"got {type(bundles)}."
  314. )
  315. if len(bundles) == 0:
  316. raise ValueError(
  317. "Bundles must be a non-empty list of resource "
  318. 'dictionaries. For example: `[{"CPU": 1.0}, {"GPU": 1.0}]`. '
  319. "Got empty list instead."
  320. )
  321. for bundle in bundles:
  322. if (
  323. not isinstance(bundle, dict)
  324. or not all(isinstance(k, str) for k in bundle.keys())
  325. or not all(isinstance(v, (int, float)) for v in bundle.values())
  326. ):
  327. raise ValueError(
  328. "Bundles must be a non-empty list of "
  329. "resource dictionaries. For example: "
  330. '`[{"CPU": 1.0}, {"GPU": 1.0}]`.'
  331. )
  332. if len(bundle) == 0 or all(
  333. resource_value == 0 for resource_value in bundle.values()
  334. ):
  335. raise ValueError(
  336. "Bundles cannot be an empty dictionary or "
  337. f"resources with only 0 values. Bundles: {bundles}"
  338. )
  339. if "object_store_memory" in bundle.keys():
  340. warnings.warn(
  341. "Setting 'object_store_memory' for"
  342. " bundles is deprecated since it doesn't actually"
  343. " reserve the required object store memory."
  344. f" Use object spilling that's enabled by default (https://docs.ray.io/en/{get_ray_doc_version()}/ray-core/objects/object-spilling.html) " # noqa: E501
  345. "instead to bypass the object store memory size limitation.",
  346. DeprecationWarning,
  347. stacklevel=1,
  348. )
  349. def _validate_bundle_label_selector(bundle_label_selector: List[Dict[str, str]]):
  350. """Validates each label selector and raises a ValueError if any label selector is invalid."""
  351. if not isinstance(bundle_label_selector, list):
  352. raise ValueError(
  353. "Placement group bundle_label_selector must be a list, "
  354. f"got {type(bundle_label_selector)}."
  355. )
  356. if len(bundle_label_selector) == 0:
  357. # No label selectors provided, no-op.
  358. return
  359. for label_selector in bundle_label_selector:
  360. if (
  361. not isinstance(label_selector, dict)
  362. or not all(isinstance(k, str) for k in label_selector.keys())
  363. or not all(isinstance(v, str) for v in label_selector.values())
  364. ):
  365. raise ValueError(
  366. "Bundle label selector must be a list of string dictionary"
  367. " label selectors. For example: "
  368. '`[{ray.io/market_type": "spot"}, {"ray.io/accelerator-type": "A100"}]`.'
  369. )
  370. # Call helper function to validate label selector key-value syntax.
  371. error_message = validate_label_selector(label_selector)
  372. if error_message:
  373. raise ValueError(
  374. f"Invalid label selector provided in bundle_label_selector list."
  375. f" Detailed error: '{error_message}'"
  376. )
  377. def _valid_resource_shape(resources, bundle_specs):
  378. """
  379. If the resource shape cannot fit into every
  380. bundle spec, return False
  381. """
  382. for bundle in bundle_specs:
  383. fit_in_bundle = True
  384. for resource, requested_val in resources.items():
  385. # Skip "bundle" resource as it is automatically added
  386. # to all nodes with bundles by the placement group.
  387. if resource == PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME:
  388. continue
  389. if bundle.get(resource, 0) < requested_val:
  390. fit_in_bundle = False
  391. break
  392. if fit_in_bundle:
  393. # If resource request fits in any bundle, it is valid.
  394. return True
  395. return False
  396. def _validate_resource_shape(
  397. placement_group, resources, placement_resources, task_or_actor_repr
  398. ):
  399. bundles = placement_group.bundle_specs
  400. resources_valid = _valid_resource_shape(resources, bundles)
  401. placement_resources_valid = _valid_resource_shape(placement_resources, bundles)
  402. if not resources_valid:
  403. raise ValueError(
  404. f"Cannot schedule {task_or_actor_repr} with "
  405. "the placement group because the resource request "
  406. f"{resources} cannot fit into any bundles for "
  407. f"the placement group, {bundles}."
  408. )
  409. if not placement_resources_valid:
  410. # Happens for the default actor case.
  411. # placement_resources is not an exposed concept to users,
  412. # so we should write more specialized error messages.
  413. raise ValueError(
  414. f"Cannot schedule {task_or_actor_repr} with "
  415. "the placement group because the actor requires "
  416. f"{placement_resources.get('CPU', 0)} CPU for "
  417. "creation, but it cannot "
  418. f"fit into any bundles for the placement group, "
  419. f"{bundles}. Consider "
  420. "creating a placement group with CPU resources."
  421. )
  422. def _configure_placement_group_based_on_context(
  423. placement_group_capture_child_tasks: bool,
  424. bundle_index: int,
  425. resources: Dict,
  426. placement_resources: Dict,
  427. task_or_actor_repr: str,
  428. placement_group: Union[PlacementGroup, str, None] = "default",
  429. ) -> PlacementGroup:
  430. """Configure the placement group based on the given context.
  431. Based on the given context, this API returns the placement group instance
  432. for task/actor scheduling.
  433. Params:
  434. placement_group_capture_child_tasks: Whether or not the
  435. placement group needs to be captured from the global
  436. context.
  437. bundle_index: The bundle index for tasks/actor scheduling.
  438. resources: The scheduling resources.
  439. placement_resources: The scheduling placement resources for
  440. actors.
  441. task_or_actor_repr: The repr of task or actor
  442. function/class descriptor.
  443. placement_group: The placement group instance.
  444. - "default": Default placement group argument. Currently,
  445. the default behavior is to capture the parent task'
  446. placement group if placement_group_capture_child_tasks
  447. is set.
  448. - None: means placement group is explicitly not configured.
  449. - Placement group instance: In this case, do nothing.
  450. Returns:
  451. Placement group instance based on the given context.
  452. Raises:
  453. ValueError: If the bundle index is invalid for the placement group
  454. or the requested resources shape doesn't fit to any
  455. bundles.
  456. """
  457. # Validate inputs.
  458. assert placement_group_capture_child_tasks is not None
  459. assert resources is not None
  460. # Validate and get the PlacementGroup instance.
  461. # Placement group could be None, default, or placement group.
  462. # Default behavior is "do not capture child tasks".
  463. if placement_group != "default":
  464. if not placement_group:
  465. placement_group = PlacementGroup.empty()
  466. elif placement_group == "default":
  467. if placement_group_capture_child_tasks:
  468. placement_group = get_current_placement_group()
  469. else:
  470. placement_group = PlacementGroup.empty()
  471. if not placement_group:
  472. placement_group = PlacementGroup.empty()
  473. assert isinstance(placement_group, PlacementGroup)
  474. # Validate the index.
  475. check_placement_group_index(placement_group, bundle_index)
  476. # Validate the shape.
  477. if not placement_group.is_empty:
  478. _validate_resource_shape(
  479. placement_group, resources, placement_resources, task_or_actor_repr
  480. )
  481. return placement_group