data_config.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import copy
  2. from collections import defaultdict
  3. from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Union
  4. from ray.actor import ActorHandle
  5. from ray.util.annotations import DeveloperAPI, PublicAPI
  6. if TYPE_CHECKING:
  7. from ray.data import DataIterator, Dataset, ExecutionOptions, NodeIdStr
  8. @PublicAPI(stability="stable")
  9. class DataConfig:
  10. """Class responsible for configuring Train dataset preprocessing.
  11. For advanced use cases, this class can be subclassed and the `configure()` method
  12. overridden for custom data preprocessing.
  13. """
  14. def __init__(
  15. self,
  16. datasets_to_split: Union[Literal["all"], List[str]] = "all",
  17. execution_options: Optional[
  18. Union["ExecutionOptions", Dict[str, "ExecutionOptions"]]
  19. ] = None,
  20. enable_shard_locality: bool = True,
  21. ):
  22. """Construct a DataConfig.
  23. Args:
  24. datasets_to_split: Specifies which datasets should be split among workers.
  25. Can be set to "all" or a list of dataset names. Defaults to "all",
  26. i.e. split all datasets.
  27. execution_options: The execution options to pass to Ray Data. Can be either:
  28. 1. A single ExecutionOptions object that is applied to all datasets.
  29. 2. A dict mapping dataset names to ExecutionOptions. If a dataset name
  30. is not in the dict, it defaults to ``DataConfig.default_ingest_options()``.
  31. By default, the options are optimized for data ingest. When overriding,
  32. base your options off ``DataConfig.default_ingest_options()``.
  33. enable_shard_locality: If true, dataset sharding across Train workers will
  34. consider locality to minimize cross-node data transfer. Enabled by default.
  35. """
  36. from ray.data import ExecutionOptions
  37. if isinstance(datasets_to_split, list) or datasets_to_split == "all":
  38. self._datasets_to_split = datasets_to_split
  39. else:
  40. raise TypeError(
  41. "`datasets_to_split` should be a 'all' or a list of strings of "
  42. "dataset names. Received "
  43. f"{type(datasets_to_split).__name__} with value {datasets_to_split}."
  44. )
  45. default_execution_options = DataConfig.default_ingest_options()
  46. if isinstance(execution_options, ExecutionOptions):
  47. default_execution_options = execution_options
  48. # If None, all datasets will use the default ingest options.
  49. self._execution_options: Dict[str, "ExecutionOptions"] = defaultdict(
  50. lambda: copy.deepcopy(default_execution_options)
  51. )
  52. if isinstance(execution_options, dict):
  53. self._execution_options.update(execution_options)
  54. self._enable_shard_locality = enable_shard_locality
  55. self._num_train_cpus = 0.0
  56. self._num_train_gpus = 0.0
  57. def set_train_total_resources(self, num_train_cpus: float, num_train_gpus: float):
  58. """Set the total number of CPUs and GPUs used by training.
  59. If CPU or GPU resource limits are not set, they will be set to the
  60. total cluster resources minus the resources used by training.
  61. """
  62. # TODO: We may also include other resources besides CPU and GPU.
  63. self._num_train_cpus = num_train_cpus
  64. self._num_train_gpus = num_train_gpus
  65. def _get_execution_options(self, dataset_name: str) -> "ExecutionOptions":
  66. """Return a copy of the configured execution options for a given dataset name."""
  67. return copy.deepcopy(self._execution_options[dataset_name])
  68. @DeveloperAPI
  69. def configure(
  70. self,
  71. datasets: Dict[str, "Dataset"],
  72. world_size: int,
  73. worker_handles: Optional[List[ActorHandle]],
  74. worker_node_ids: Optional[List["NodeIdStr"]],
  75. **kwargs,
  76. ) -> List[Dict[str, "DataIterator"]]:
  77. """Configure how Train datasets should be assigned to workers.
  78. Args:
  79. datasets: The datasets dict passed to Train by the user.
  80. world_size: The number of Train workers in total.
  81. worker_handles: The actor handles of the Train workers.
  82. worker_node_ids: The node ids of the Train workers.
  83. kwargs: Forwards compatibility placeholder.
  84. Returns:
  85. A list of dataset splits for each worker. The size of the list must be
  86. equal to `world_size`. Each element of the list contains the assigned
  87. `DataIterator` instances by name for the worker.
  88. """
  89. from ray.data._internal.execution.interfaces.execution_options import (
  90. ExecutionResources,
  91. )
  92. output = [{} for _ in range(world_size)]
  93. for dataset_name, dataset in datasets.items():
  94. if dataset.name is None:
  95. dataset.set_name(dataset_name)
  96. if self._datasets_to_split == "all":
  97. datasets_to_split = set(datasets.keys())
  98. else:
  99. datasets_to_split = set(self._datasets_to_split)
  100. locality_hints = worker_node_ids if self._enable_shard_locality else None
  101. for name, ds in datasets.items():
  102. execution_options = self._get_execution_options(name)
  103. if execution_options.is_resource_limits_default():
  104. # If "resource_limits" is not overridden by the user,
  105. # add training-reserved resources to Data's exclude_resources.
  106. execution_options.exclude_resources = (
  107. execution_options.exclude_resources.add(
  108. ExecutionResources(
  109. cpu=self._num_train_cpus, gpu=self._num_train_gpus
  110. )
  111. )
  112. )
  113. ds = ds.copy(ds)
  114. ds.context.execution_options = execution_options
  115. if name in datasets_to_split:
  116. for i, split in enumerate(
  117. ds.streaming_split(
  118. world_size, equal=True, locality_hints=locality_hints
  119. )
  120. ):
  121. output[i][name] = split
  122. else:
  123. for i in range(world_size):
  124. output[i][name] = ds.iterator()
  125. return output
  126. @staticmethod
  127. def default_ingest_options() -> "ExecutionOptions":
  128. """The default Ray Data options used for data ingest.
  129. By default, configurations are carried over from what is already set
  130. in DataContext.
  131. """
  132. from ray.data import ExecutionOptions
  133. from ray.data.context import DataContext
  134. ctx = DataContext.get_current()
  135. return ExecutionOptions(
  136. # TODO(hchen): Re-enable `locality_with_output` by default after fixing
  137. # https://github.com/ray-project/ray/issues/40607
  138. locality_with_output=ctx.execution_options.locality_with_output,
  139. resource_limits=ctx.execution_options.resource_limits,
  140. exclude_resources=ctx.execution_options.exclude_resources,
  141. preserve_order=ctx.execution_options.preserve_order,
  142. verbose_progress=ctx.execution_options.verbose_progress,
  143. )