| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528 |
- import collections
- import logging
- import os
- import random
- import shutil
- import subprocess
- import sys
- import threading
- import time
- from ray._common.network_utils import is_ipv6
- _logger = logging.getLogger("ray.util.spark.utils")
- def is_in_databricks_runtime():
- return "DATABRICKS_RUNTIME_VERSION" in os.environ
- def gen_cmd_exec_failure_msg(cmd, return_code, tail_output_deque):
- cmd_str = " ".join(cmd)
- tail_output = "".join(tail_output_deque)
- return (
- f"Command {cmd_str} failed with return code {return_code}, tail output are "
- f"included below.\n{tail_output}\n"
- )
- def get_configured_spark_executor_memory_bytes(spark):
- value_str = spark.conf.get("spark.executor.memory", "1g").lower()
- value_num = int(value_str[:-1])
- value_unit = value_str[-1]
- unit_map = {
- "k": 1024,
- "m": 1024 * 1024,
- "g": 1024 * 1024 * 1024,
- "t": 1024 * 1024 * 1024 * 1024,
- }
- return value_num * unit_map[value_unit]
- def exec_cmd(
- cmd,
- *,
- extra_env=None,
- synchronous=True,
- **kwargs,
- ):
- """
- A convenience wrapper of `subprocess.Popen` for running a command from a Python
- script.
- If `synchronous` is True, wait until the process terminated and if subprocess
- return code is not 0, raise error containing last 100 lines output.
- If `synchronous` is False, return an `Popen` instance and a deque instance holding
- tail outputs.
- The subprocess stdout / stderr output will be streamly redirected to current
- process stdout.
- """
- illegal_kwargs = set(kwargs.keys()).intersection({"text", "stdout", "stderr"})
- if illegal_kwargs:
- raise ValueError(f"`kwargs` cannot contain {list(illegal_kwargs)}")
- env = kwargs.pop("env", None)
- if extra_env is not None and env is not None:
- raise ValueError("`extra_env` and `env` cannot be used at the same time")
- env = env if extra_env is None else {**os.environ, **extra_env}
- process = subprocess.Popen(
- cmd,
- env=env,
- text=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- **kwargs,
- )
- tail_output_deque = collections.deque(maxlen=100)
- def redirect_log_thread_fn():
- for line in process.stdout:
- # collect tail logs by `tail_output_deque`
- tail_output_deque.append(line)
- # redirect to stdout.
- sys.stdout.write(line)
- threading.Thread(target=redirect_log_thread_fn, args=()).start()
- if not synchronous:
- return process, tail_output_deque
- return_code = process.wait()
- if return_code != 0:
- raise RuntimeError(
- gen_cmd_exec_failure_msg(cmd, return_code, tail_output_deque)
- )
- def is_port_in_use(host, port):
- import socket
- from contextlib import closing
- with closing(
- socket.socket(
- socket.AF_INET6 if is_ipv6(host) else socket.AF_INET, socket.SOCK_STREAM
- )
- ) as sock:
- return sock.connect_ex((host, port)) == 0
- def _wait_service_up(host, port, timeout):
- beg_time = time.time()
- while time.time() - beg_time < timeout:
- if is_port_in_use(host, port):
- return True
- time.sleep(1)
- return False
- def get_random_unused_port(
- host, min_port=1024, max_port=65535, max_retries=100, exclude_list=None
- ):
- """
- Get random unused port.
- """
- # Use true random generator
- rng = random.SystemRandom()
- exclude_list = exclude_list or []
- for _ in range(max_retries):
- port = rng.randint(min_port, max_port)
- if port in exclude_list:
- continue
- if not is_port_in_use(host, port):
- return port
- raise RuntimeError(
- f"Get available port between range {min_port} and {max_port} failed."
- )
- def get_spark_session():
- from pyspark.sql import SparkSession
- spark_session = SparkSession.getActiveSession()
- if spark_session is None:
- raise RuntimeError(
- "Spark session haven't been initiated yet. Please use "
- "`SparkSession.builder` to create a spark session and connect to a spark "
- "cluster."
- )
- return spark_session
- def get_spark_application_driver_host(spark):
- return spark.conf.get("spark.driver.host")
- def get_max_num_concurrent_tasks(spark_context, resource_profile):
- """Gets the current max number of concurrent tasks."""
- # pylint: disable=protected-access=
- ssc = spark_context._jsc.sc()
- if resource_profile is not None:
- def dummpy_mapper(_):
- pass
- # Runs a dummy spark job to register the `res_profile`
- spark_context.parallelize([1], 1).withResources(resource_profile).map(
- dummpy_mapper
- ).collect()
- return ssc.maxNumConcurrentTasks(resource_profile._java_resource_profile)
- else:
- return ssc.maxNumConcurrentTasks(
- ssc.resourceProfileManager().defaultResourceProfile()
- )
- def _get_spark_worker_total_physical_memory():
- import psutil
- if RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES in os.environ:
- return int(os.environ[RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES])
- return psutil.virtual_memory().total
- def _get_spark_worker_total_shared_memory():
- import shutil
- if RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES in os.environ:
- return int(os.environ[RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES])
- return shutil.disk_usage("/dev/shm").total
- # The maximum proportion for Ray worker node object store memory size
- _RAY_ON_SPARK_MAX_OBJECT_STORE_MEMORY_PROPORTION = 0.8
- # The buffer offset for calculating Ray node memory.
- _RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET = 0.8
- def calc_mem_ray_head_node(configured_heap_memory_bytes, configured_object_store_bytes):
- import shutil
- import psutil
- if RAY_ON_SPARK_DRIVER_PHYSICAL_MEMORY_BYTES in os.environ:
- available_physical_mem = int(
- os.environ[RAY_ON_SPARK_DRIVER_PHYSICAL_MEMORY_BYTES]
- )
- else:
- available_physical_mem = psutil.virtual_memory().total
- available_physical_mem = (
- available_physical_mem * _RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET
- )
- if RAY_ON_SPARK_DRIVER_SHARED_MEMORY_BYTES in os.environ:
- available_shared_mem = int(os.environ[RAY_ON_SPARK_DRIVER_SHARED_MEMORY_BYTES])
- else:
- available_shared_mem = shutil.disk_usage("/dev/shm").total
- available_shared_mem = (
- available_shared_mem * _RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET
- )
- heap_mem_bytes, object_store_bytes, warning_msg = _calc_mem_per_ray_node(
- available_physical_mem,
- available_shared_mem,
- configured_heap_memory_bytes,
- configured_object_store_bytes,
- )
- if warning_msg is not None:
- _logger.warning(warning_msg)
- return heap_mem_bytes, object_store_bytes
- def _calc_mem_per_ray_worker_node(
- num_task_slots,
- physical_mem_bytes,
- shared_mem_bytes,
- configured_heap_memory_bytes,
- configured_object_store_bytes,
- ):
- available_physical_mem_per_node = int(
- physical_mem_bytes / num_task_slots * _RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET
- )
- available_shared_mem_per_node = int(
- shared_mem_bytes / num_task_slots * _RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET
- )
- return _calc_mem_per_ray_node(
- available_physical_mem_per_node,
- available_shared_mem_per_node,
- configured_heap_memory_bytes,
- configured_object_store_bytes,
- )
- def _calc_mem_per_ray_node(
- available_physical_mem_per_node,
- available_shared_mem_per_node,
- configured_heap_memory_bytes,
- configured_object_store_bytes,
- ):
- from ray._private.ray_constants import (
- DEFAULT_OBJECT_STORE_MEMORY_PROPORTION,
- OBJECT_STORE_MINIMUM_MEMORY_BYTES,
- )
- warning_msg = None
- object_store_bytes = configured_object_store_bytes or (
- available_physical_mem_per_node * DEFAULT_OBJECT_STORE_MEMORY_PROPORTION
- )
- # If allow Ray using slow storage oas object store,
- # we don't need to cap object store size by /dev/shm capacity
- if not os.environ.get("RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE"):
- if object_store_bytes > available_shared_mem_per_node:
- object_store_bytes = available_shared_mem_per_node
- object_store_bytes_upper_bound = (
- available_physical_mem_per_node
- * _RAY_ON_SPARK_MAX_OBJECT_STORE_MEMORY_PROPORTION
- )
- if object_store_bytes > object_store_bytes_upper_bound:
- object_store_bytes = object_store_bytes_upper_bound
- warning_msg = (
- "Your configured `object_store_memory_per_node` value "
- "is too high and it is capped by 80% of per-Ray node "
- "allocated memory."
- )
- if object_store_bytes < OBJECT_STORE_MINIMUM_MEMORY_BYTES:
- if object_store_bytes == available_shared_mem_per_node:
- warning_msg = (
- "Your operating system is configured with too small /dev/shm "
- "size, so `object_store_memory_worker_node` value is configured "
- f"to minimal size ({OBJECT_STORE_MINIMUM_MEMORY_BYTES} bytes),"
- f"Please increase system /dev/shm size."
- )
- else:
- warning_msg = (
- "You configured too small Ray node object store memory size, "
- "so `object_store_memory_worker_node` value is configured "
- f"to minimal size ({OBJECT_STORE_MINIMUM_MEMORY_BYTES} bytes),"
- "Please increase 'object_store_memory_worker_node' argument value."
- )
- object_store_bytes = OBJECT_STORE_MINIMUM_MEMORY_BYTES
- object_store_bytes = int(object_store_bytes)
- if configured_heap_memory_bytes is None:
- heap_mem_bytes = int(available_physical_mem_per_node - object_store_bytes)
- else:
- heap_mem_bytes = int(configured_heap_memory_bytes)
- return heap_mem_bytes, object_store_bytes, warning_msg
- # User can manually set these environment variables
- # if ray on spark code accessing corresponding information failed.
- # Note these environment variables must be set in spark executor side,
- # you should set them via setting spark config of
- # `spark.executorEnv.[EnvironmentVariableName]`
- RAY_ON_SPARK_WORKER_CPU_CORES = "RAY_ON_SPARK_WORKER_CPU_CORES"
- RAY_ON_SPARK_WORKER_GPU_NUM = "RAY_ON_SPARK_WORKER_GPU_NUM"
- RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES = "RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES"
- RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES = "RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES"
- # User can manually set these environment variables on spark driver node
- # if ray on spark code accessing corresponding information failed.
- RAY_ON_SPARK_DRIVER_PHYSICAL_MEMORY_BYTES = "RAY_ON_SPARK_DRIVER_PHYSICAL_MEMORY_BYTES"
- RAY_ON_SPARK_DRIVER_SHARED_MEMORY_BYTES = "RAY_ON_SPARK_DRIVER_SHARED_MEMORY_BYTES"
- def _get_cpu_cores():
- import multiprocessing
- if RAY_ON_SPARK_WORKER_CPU_CORES in os.environ:
- # In some cases, spark standalone cluster might configure virtual cpu cores
- # for spark worker that different with number of physical cpu cores,
- # but we cannot easily get the virtual cpu cores configured for spark
- # worker, as a workaround, we provide an environmental variable config
- # `RAY_ON_SPARK_WORKER_CPU_CORES` for user.
- return int(os.environ[RAY_ON_SPARK_WORKER_CPU_CORES])
- return multiprocessing.cpu_count()
- def _get_num_physical_gpus():
- if RAY_ON_SPARK_WORKER_GPU_NUM in os.environ:
- # In some cases, spark standalone cluster might configure part of physical
- # GPUs for spark worker,
- # but we cannot easily get related configuration,
- # as a workaround, we provide an environmental variable config
- # `RAY_ON_SPARK_WORKER_CPU_CORES` for user.
- return int(os.environ[RAY_ON_SPARK_WORKER_GPU_NUM])
- if shutil.which("nvidia-smi") is None:
- # GPU driver is not installed.
- return 0
- try:
- completed_proc = subprocess.run(
- "nvidia-smi --query-gpu=name --format=csv,noheader",
- shell=True,
- check=True,
- text=True,
- capture_output=True,
- )
- return len(completed_proc.stdout.strip().split("\n"))
- except Exception as e:
- _logger.info(
- "'nvidia-smi --query-gpu=name --format=csv,noheader' command execution "
- f"failed, error: {repr(e)}"
- )
- return 0
- def _get_local_ray_node_slots(
- num_cpus,
- num_gpus,
- num_cpus_per_node,
- num_gpus_per_node,
- ):
- if num_cpus_per_node > num_cpus:
- raise ValueError(
- "cpu number per Ray worker node should be <= spark worker node CPU cores, "
- f"you set cpu number per Ray worker node to {num_cpus_per_node} but "
- f"spark worker node CPU core number is {num_cpus}."
- )
- num_ray_node_slots = num_cpus // num_cpus_per_node
- if num_gpus_per_node > 0:
- if num_gpus_per_node > num_gpus:
- raise ValueError(
- "gpu number per Ray worker node should be <= spark worker node "
- "GPU number, you set GPU devices number per Ray worker node to "
- f"{num_gpus_per_node} but spark worker node GPU devices number "
- f"is {num_gpus}."
- )
- if num_ray_node_slots > num_gpus // num_gpus_per_node:
- num_ray_node_slots = num_gpus // num_gpus_per_node
- return num_ray_node_slots
- def _get_avail_mem_per_ray_worker_node(
- num_cpus_per_node,
- num_gpus_per_node,
- heap_memory_per_node,
- object_store_memory_per_node,
- ):
- """
- Returns tuple of (
- ray_worker_node_heap_mem_bytes,
- ray_worker_node_object_store_bytes,
- error_message, # always None
- warning_message,
- )
- """
- num_cpus = _get_cpu_cores()
- if num_gpus_per_node > 0:
- num_gpus = _get_num_physical_gpus()
- else:
- num_gpus = 0
- num_ray_node_slots = _get_local_ray_node_slots(
- num_cpus, num_gpus, num_cpus_per_node, num_gpus_per_node
- )
- physical_mem_bytes = _get_spark_worker_total_physical_memory()
- shared_mem_bytes = _get_spark_worker_total_shared_memory()
- (
- ray_worker_node_heap_mem_bytes,
- ray_worker_node_object_store_bytes,
- warning_msg,
- ) = _calc_mem_per_ray_worker_node(
- num_ray_node_slots,
- physical_mem_bytes,
- shared_mem_bytes,
- heap_memory_per_node,
- object_store_memory_per_node,
- )
- return (
- ray_worker_node_heap_mem_bytes,
- ray_worker_node_object_store_bytes,
- None,
- warning_msg,
- )
- def get_avail_mem_per_ray_worker_node(
- spark,
- heap_memory_per_node,
- object_store_memory_per_node,
- num_cpus_per_node,
- num_gpus_per_node,
- ):
- """
- Return the available heap memory and object store memory for each ray worker,
- and error / warning message if it has.
- Return value is a tuple of
- (ray_worker_node_heap_mem_bytes, ray_worker_node_object_store_bytes,
- error_message, warning_message)
- NB: We have one ray node per spark task.
- """
- def mapper(_):
- try:
- return _get_avail_mem_per_ray_worker_node(
- num_cpus_per_node,
- num_gpus_per_node,
- heap_memory_per_node,
- object_store_memory_per_node,
- )
- except Exception as e:
- import traceback
- trace_msg = "\n".join(traceback.format_tb(e.__traceback__))
- return -1, -1, repr(e) + trace_msg, None
- # Running memory inference routine on spark executor side since the spark worker
- # nodes may have a different machine configuration compared to the spark driver
- # node.
- (
- inferred_ray_worker_node_heap_mem_bytes,
- inferred_ray_worker_node_object_store_bytes,
- err,
- warning_msg,
- ) = (
- spark.sparkContext.parallelize([1], 1).map(mapper).collect()[0]
- )
- if err is not None:
- raise RuntimeError(
- f"Inferring ray worker node available memory failed, error: {err}. "
- "You can bypass this error by setting following spark configs: "
- "spark.executorEnv.RAY_ON_SPARK_WORKER_CPU_CORES, "
- "spark.executorEnv.RAY_ON_SPARK_WORKER_GPU_NUM, "
- "spark.executorEnv.RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES, "
- "spark.executorEnv.RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES."
- )
- if warning_msg is not None:
- _logger.warning(warning_msg)
- return (
- inferred_ray_worker_node_heap_mem_bytes,
- inferred_ray_worker_node_object_store_bytes,
- )
- def get_spark_task_assigned_physical_gpus(gpu_addr_list):
- if "CUDA_VISIBLE_DEVICES" in os.environ:
- visible_cuda_dev_list = [
- int(dev.strip()) for dev in os.environ["CUDA_VISIBLE_DEVICES"].split(",")
- ]
- return [visible_cuda_dev_list[addr] for addr in gpu_addr_list]
- else:
- return gpu_addr_list
|