| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import json
- import logging
- import os
- import subprocess
- import sys
- from typing import List, Optional, Tuple
- from ray._private.accelerators.accelerator import AcceleratorManager
- from ray._private.ray_constants import env_bool
- logger = logging.getLogger(__name__)
- NEURON_RT_VISIBLE_CORES_ENV_VAR = "NEURON_RT_VISIBLE_CORES"
- NOSET_AWS_NEURON_RT_VISIBLE_CORES_ENV_VAR = (
- "RAY_EXPERIMENTAL_NOSET_NEURON_RT_VISIBLE_CORES"
- )
- # https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/arch/neuron-hardware/inf2-arch.html#aws-inf2-arch
- # https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/arch/neuron-hardware/trn1-arch.html#aws-trn1-arch
- # Subject to removal after the information is available via public API
- AWS_NEURON_INSTANCE_MAP = {
- "trn1.2xlarge": 2,
- "trn1.32xlarge": 32,
- "trn1n.32xlarge": 32,
- "inf2.xlarge": 2,
- "inf2.8xlarge": 2,
- "inf2.24xlarge": 12,
- "inf2.48xlarge": 24,
- }
- class NeuronAcceleratorManager(AcceleratorManager):
- """AWS Inferentia and Trainium accelerators."""
- @staticmethod
- def get_resource_name() -> str:
- return "neuron_cores"
- @staticmethod
- def get_visible_accelerator_ids_env_var() -> str:
- return NEURON_RT_VISIBLE_CORES_ENV_VAR
- @staticmethod
- def get_current_process_visible_accelerator_ids() -> Optional[List[str]]:
- neuron_visible_cores = os.environ.get(
- NeuronAcceleratorManager.get_visible_accelerator_ids_env_var(), None
- )
- if neuron_visible_cores is None:
- return None
- if neuron_visible_cores == "":
- return []
- return list(neuron_visible_cores.split(","))
- @staticmethod
- def get_current_node_num_accelerators() -> int:
- """
- Attempt to detect the number of Neuron cores on this machine.
- Returns:
- The number of Neuron cores if any were detected, otherwise 0.
- """
- nc_count: int = 0
- neuron_path = "/opt/aws/neuron/bin/"
- if sys.platform.startswith("linux") and os.path.isdir(neuron_path):
- result = subprocess.run(
- [os.path.join(neuron_path, "neuron-ls"), "--json-output"],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- if result.returncode == 0 and result.stdout:
- neuron_devices = json.loads(result.stdout)
- for neuron_device in neuron_devices:
- nc_count += neuron_device.get("nc_count", 0)
- return nc_count
- @staticmethod
- def get_current_node_accelerator_type() -> Optional[str]:
- from ray.util.accelerators import AWS_NEURON_CORE
- return AWS_NEURON_CORE
- @staticmethod
- def validate_resource_request_quantity(
- quantity: float,
- ) -> Tuple[bool, Optional[str]]:
- if isinstance(quantity, float) and not quantity.is_integer():
- return (
- False,
- f"{NeuronAcceleratorManager.get_resource_name()} resource quantity"
- " must be whole numbers. "
- f"The specified quantity {quantity} is invalid.",
- )
- else:
- return (True, None)
- @staticmethod
- def set_current_process_visible_accelerator_ids(
- visible_neuron_core_ids: List[str],
- ) -> None:
- """Set the NEURON_RT_VISIBLE_CORES environment variable based on
- given visible_neuron_core_ids.
- Args:
- visible_neuron_core_ids (List[str]): List of int representing core IDs.
- """
- if env_bool(NOSET_AWS_NEURON_RT_VISIBLE_CORES_ENV_VAR, False):
- return
- os.environ[
- NeuronAcceleratorManager.get_visible_accelerator_ids_env_var()
- ] = ",".join([str(i) for i in visible_neuron_core_ids])
- @staticmethod
- def get_ec2_instance_num_accelerators(
- instance_type: str, instances: dict
- ) -> Optional[int]:
- # TODO: AWS SDK (public API) doesn't yet expose the NeuronCore
- # information. It will be available (work-in-progress)
- # as xxAcceleratorInfo in InstanceTypeInfo.
- # https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_InstanceTypeInfo.html
- # See https://github.com/ray-project/ray/issues/38473
- return AWS_NEURON_INSTANCE_MAP.get(instance_type.lower(), None)
- @staticmethod
- def get_ec2_instance_accelerator_type(
- instance_type: str, instances: dict
- ) -> Optional[str]:
- from ray.util.accelerators import AWS_NEURON_CORE
- return AWS_NEURON_CORE
|