resource.py 1.2 KB

12345678910111213141516171819202122232425262728293031
  1. from typing import TYPE_CHECKING, Dict, List
  2. from ray.rllib.utils.annotations import PublicAPI
  3. if TYPE_CHECKING:
  4. from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
  5. DEFAULT_NUM_CPUS_PER_TASK = 0.5
  6. @PublicAPI
  7. def get_offline_io_resource_bundles(
  8. config: "AlgorithmConfig",
  9. ) -> List[Dict[str, float]]:
  10. # DatasetReader is the only offline I/O component today that
  11. # requires compute resources.
  12. if config.input_ == "dataset":
  13. input_config = config.input_config
  14. # TODO (Kourosh): parallelism is use for reading the dataset, which defaults to
  15. # num_workers. This logic here relies on the information that dataset reader
  16. # will have the same logic. So to remove the information leakage, inside
  17. # Algorithm config, we should set parallelism to num_workers if not specified
  18. # and only deal with parallelism here or in dataset_reader.py. same thing is
  19. # true with cpus_per_task.
  20. parallelism = input_config.get("parallelism", config.get("num_env_runners", 1))
  21. cpus_per_task = input_config.get(
  22. "num_cpus_per_read_task", DEFAULT_NUM_CPUS_PER_TASK
  23. )
  24. return [{"CPU": cpus_per_task} for _ in range(parallelism)]
  25. else:
  26. return []