| 12345678910111213141516171819202122232425262728293031 |
- from typing import TYPE_CHECKING, Dict, List
- from ray.rllib.utils.annotations import PublicAPI
- if TYPE_CHECKING:
- from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
- DEFAULT_NUM_CPUS_PER_TASK = 0.5
- @PublicAPI
- def get_offline_io_resource_bundles(
- config: "AlgorithmConfig",
- ) -> List[Dict[str, float]]:
- # DatasetReader is the only offline I/O component today that
- # requires compute resources.
- if config.input_ == "dataset":
- input_config = config.input_config
- # TODO (Kourosh): parallelism is use for reading the dataset, which defaults to
- # num_workers. This logic here relies on the information that dataset reader
- # will have the same logic. So to remove the information leakage, inside
- # Algorithm config, we should set parallelism to num_workers if not specified
- # and only deal with parallelism here or in dataset_reader.py. same thing is
- # true with cpus_per_task.
- parallelism = input_config.get("parallelism", config.get("num_env_runners", 1))
- cpus_per_task = input_config.get(
- "num_cpus_per_read_task", DEFAULT_NUM_CPUS_PER_TASK
- )
- return [{"CPU": cpus_per_task} for _ in range(parallelism)]
- else:
- return []
|