| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684 |
- import contextlib
- import importlib
- import json
- import logging
- import multiprocessing
- import os
- import platform
- import re
- import signal
- import subprocess
- import sys
- import threading
- import time
- from collections import defaultdict
- from pathlib import Path
- from subprocess import list2cmdline
- from typing import (
- TYPE_CHECKING,
- Dict,
- List,
- Mapping,
- Optional,
- Tuple,
- Union,
- )
- from google.protobuf import json_format
- import ray
- import ray._private.ray_constants as ray_constants
- from ray._common.utils import (
- PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME,
- get_ray_address_file,
- get_system_memory,
- )
- from ray.core.generated.runtime_environment_pb2 import (
- RuntimeEnvInfo as ProtoRuntimeEnvInfo,
- )
- # Import psutil after ray so the packaged version is used.
- import psutil
- if TYPE_CHECKING:
- from ray.runtime_env import RuntimeEnv
- INT32_MAX = (2**31) - 1
- pwd = None
- if sys.platform != "win32":
- import pwd
- logger = logging.getLogger(__name__)
- # Linux can bind child processes' lifetimes to that of their parents via prctl.
- # prctl support is detected dynamically once, and assumed thereafter.
- linux_prctl = None
- # Windows can bind processes' lifetimes to that of kernel-level "job objects".
- # We keep a global job object to tie its lifetime to that of our own process.
- win32_job = None
- win32_AssignProcessToJobObject = None
- ENV_DISABLE_DOCKER_CPU_WARNING = "RAY_DISABLE_DOCKER_CPU_WARNING" in os.environ
- # This global variable is used for testing only
- _CALLED_FREQ = defaultdict(lambda: 0)
- _CALLED_FREQ_LOCK = threading.Lock()
- PLACEMENT_GROUP_INDEXED_BUNDLED_RESOURCE_PATTERN = re.compile(
- r"(.+)_group_(\d+)_([0-9a-zA-Z]+)"
- )
- PLACEMENT_GROUP_WILDCARD_RESOURCE_PATTERN = re.compile(r"(.+)_group_([0-9a-zA-Z]+)")
- def write_ray_address(ray_address: str, temp_dir: Optional[str] = None):
- address_file = get_ray_address_file(temp_dir)
- if os.path.exists(address_file):
- with open(address_file, "r") as f:
- prev_address = f.read()
- if prev_address == ray_address:
- return
- logger.info(
- f"Overwriting previous Ray address ({prev_address}). "
- "Running ray.init() on this node will now connect to the new "
- f"instance at {ray_address}. To override this behavior, pass "
- f"address={prev_address} to ray.init()."
- )
- with open(address_file, "w+") as f:
- f.write(ray_address)
- def read_ray_address(temp_dir: Optional[str] = None) -> str:
- address_file = get_ray_address_file(temp_dir)
- if not os.path.exists(address_file):
- return None
- with open(address_file, "r") as f:
- return f.read().strip()
- def format_error_message(exception_message: str, task_exception: bool = False):
- """Improve the formatting of an exception thrown by a remote function.
- This method takes a traceback from an exception and makes it nicer by
- removing a few uninformative lines and adding some space to indent the
- remaining lines nicely.
- Args:
- exception_message: A message generated by traceback.format_exc().
- Returns:
- A string of the formatted exception message.
- """
- lines = exception_message.split("\n")
- if task_exception:
- # For errors that occur inside of tasks, remove lines 1 and 2 which are
- # always the same, they just contain information about the worker code.
- lines = lines[0:1] + lines[3:]
- pass
- return "\n".join(lines)
- def push_error_to_driver(
- worker, error_type: str, message: str, job_id: Optional[str] = None
- ):
- """Push an error message to the driver to be printed in the background.
- Args:
- worker: The worker to use.
- error_type: The type of the error.
- message: The message that will be printed in the background
- on the driver.
- job_id: The ID of the driver to push the error message to. If this
- is None, then the message will be pushed to all drivers.
- """
- if job_id is None:
- job_id = ray.JobID.nil()
- assert isinstance(job_id, ray.JobID)
- worker.core_worker.push_error(job_id, error_type, message, time.time())
- def publish_error_to_driver(
- error_type: str,
- message: str,
- gcs_client,
- job_id=None,
- ):
- """Push an error message to the driver to be printed in the background.
- Normally the push_error_to_driver function should be used. However, in some
- instances, the raylet client is not available, e.g., because the
- error happens in Python before the driver or worker has connected to the
- backend processes.
- Args:
- error_type: The type of the error.
- message: The message that will be printed in the background
- on the driver.
- gcs_client: The GCS client to use.
- job_id: The ID of the driver to push the error message to. If this
- is None, then the message will be pushed to all drivers.
- """
- if job_id is None:
- job_id = ray.JobID.nil()
- assert isinstance(job_id, ray.JobID)
- try:
- gcs_client.publish_error(job_id.hex().encode(), error_type, message, job_id, 60)
- except Exception:
- logger.exception(f"Failed to publish error: {message} [type {error_type}]")
- def ensure_str(s, encoding="utf-8", errors="strict"):
- """Coerce *s* to `str`.
- - `str` -> `str`
- - `bytes` -> decoded to `str`
- """
- if isinstance(s, str):
- return s
- else:
- assert isinstance(s, bytes), f"Expected str or bytes, got {type(s)}"
- return s.decode(encoding, errors)
- def binary_to_object_ref(binary_object_ref):
- return ray.ObjectRef(binary_object_ref)
- def binary_to_task_id(binary_task_id):
- return ray.TaskID(binary_task_id)
- # TODO(qwang): Remove these hepler functions
- # once we separate `WorkerID` from `UniqueID`.
- def compute_job_id_from_driver(driver_id):
- assert isinstance(driver_id, ray.WorkerID)
- return ray.JobID(driver_id.binary()[0 : ray.JobID.size()])
- def compute_driver_id_from_job(job_id):
- assert isinstance(job_id, ray.JobID)
- rest_length = ray_constants.ID_SIZE - job_id.size()
- driver_id_str = job_id.binary() + (rest_length * b"\xff")
- return ray.WorkerID(driver_id_str)
- def get_visible_accelerator_ids() -> Mapping[str, Optional[List[str]]]:
- """Get the mapping from accelerator resource name
- to the visible ids."""
- from ray._private.accelerators import (
- get_accelerator_manager_for_resource,
- get_all_accelerator_resource_names,
- )
- return {
- accelerator_resource_name: get_accelerator_manager_for_resource(
- accelerator_resource_name
- ).get_current_process_visible_accelerator_ids()
- for accelerator_resource_name in get_all_accelerator_resource_names()
- }
- def set_omp_num_threads_if_unset() -> bool:
- """Set the OMP_NUM_THREADS to default to num cpus assigned to the worker
- This function sets the environment variable OMP_NUM_THREADS for the worker,
- if the env is not previously set and it's running in worker (WORKER_MODE).
- Returns True if OMP_NUM_THREADS is set in this function.
- """
- num_threads_from_env = os.environ.get("OMP_NUM_THREADS")
- if num_threads_from_env is not None:
- # No ops if it's set
- return False
- # If unset, try setting the correct CPU count assigned.
- runtime_ctx = ray.get_runtime_context()
- if runtime_ctx.worker.mode != ray._private.worker.WORKER_MODE:
- # Non worker mode, no ops.
- return False
- num_assigned_cpus = runtime_ctx.get_assigned_resources().get("CPU")
- if num_assigned_cpus is None:
- # This is an actor task w/o any num_cpus specified, set it to 1
- logger.debug(
- "[ray] Forcing OMP_NUM_THREADS=1 to avoid performance "
- "degradation with many workers (issue #6998). You can override this "
- "by explicitly setting OMP_NUM_THREADS, or changing num_cpus."
- )
- num_assigned_cpus = 1
- import math
- # For num_cpu < 1: Set to 1.
- # For num_cpus >= 1: Set to the floor of the actual assigned cpus.
- omp_num_threads = max(math.floor(num_assigned_cpus), 1)
- os.environ["OMP_NUM_THREADS"] = str(omp_num_threads)
- return True
- def set_visible_accelerator_ids() -> Mapping[str, Optional[str]]:
- """Set (CUDA_VISIBLE_DEVICES, ONEAPI_DEVICE_SELECTOR, HIP_VISIBLE_DEVICES,
- NEURON_RT_VISIBLE_CORES, TPU_VISIBLE_CHIPS , HABANA_VISIBLE_MODULES ,...)
- environment variables based on the accelerator runtime. Return the original
- environment variables.
- """
- from ray._private.ray_constants import env_bool
- original_visible_accelerator_env_vars = {}
- override_on_zero = env_bool(
- ray._private.accelerators.RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO_ENV_VAR,
- True,
- )
- for resource_name, accelerator_ids in (
- ray.get_runtime_context().get_accelerator_ids().items()
- ):
- # If no accelerator ids are set, skip overriding the environment variable.
- if not override_on_zero and len(accelerator_ids) == 0:
- continue
- env_var = ray._private.accelerators.get_accelerator_manager_for_resource(
- resource_name
- ).get_visible_accelerator_ids_env_var()
- original_visible_accelerator_env_vars[env_var] = os.environ.get(env_var, None)
- ray._private.accelerators.get_accelerator_manager_for_resource(
- resource_name
- ).set_current_process_visible_accelerator_ids(accelerator_ids)
- return original_visible_accelerator_env_vars
- def reset_visible_accelerator_env_vars(
- original_visible_accelerator_env_vars: Mapping[str, Optional[str]]
- ) -> None:
- """Reset the visible accelerator env vars to the original values."""
- for env_var, env_value in original_visible_accelerator_env_vars.items():
- if env_value is None:
- os.environ.pop(env_var, None)
- else:
- os.environ[env_var] = env_value
- class Unbuffered(object):
- """There's no "built-in" solution to programatically disabling buffering of
- text files. Ray expects stdout/err to be text files, so creating an
- unbuffered binary file is unacceptable.
- See
- https://mail.python.org/pipermail/tutor/2003-November/026645.html.
- https://docs.python.org/3/library/functions.html#open
- """
- def __init__(self, stream):
- self.stream = stream
- def write(self, data):
- self.stream.write(data)
- self.stream.flush()
- def writelines(self, datas):
- self.stream.writelines(datas)
- self.stream.flush()
- def __getattr__(self, attr):
- # Avoid endless loop when get `stream` attribute
- if attr == "stream":
- return super().__getattribute__("stream")
- return getattr(self.stream, attr)
- def open_log(path, unbuffered=False, **kwargs):
- """
- Opens the log file at `path`, with the provided kwargs being given to
- `open`.
- """
- # Disable buffering, see test_advanced_3.py::test_logging_to_driver
- kwargs.setdefault("buffering", 1)
- kwargs.setdefault("mode", "a")
- kwargs.setdefault("encoding", "utf-8")
- stream = open(path, **kwargs)
- if unbuffered:
- return Unbuffered(stream)
- else:
- return stream
- def _get_docker_cpus(
- cpu_quota_file_name="/sys/fs/cgroup/cpu/cpu.cfs_quota_us",
- cpu_period_file_name="/sys/fs/cgroup/cpu/cpu.cfs_period_us",
- cpuset_file_name="/sys/fs/cgroup/cpuset/cpuset.cpus",
- cpu_max_file_name="/sys/fs/cgroup/cpu.max",
- ) -> Optional[float]:
- # TODO (Alex): Don't implement this logic oursleves.
- # Docker has 2 underyling ways of implementing CPU limits:
- # https://docs.docker.com/config/containers/resource_constraints/#configure-the-default-cfs-scheduler
- # 1. --cpuset-cpus 2. --cpus or --cpu-quota/--cpu-period (--cpu-shares is a
- # soft limit so we don't worry about it). For Ray's purposes, if we use
- # docker, the number of vCPUs on a machine is whichever is set (ties broken
- # by smaller value).
- cpu_quota = None
- # See: https://bugs.openjdk.java.net/browse/JDK-8146115
- if os.path.exists(cpu_quota_file_name) and os.path.exists(cpu_period_file_name):
- try:
- with (
- open(cpu_quota_file_name, "r") as quota_file,
- open(cpu_period_file_name, "r") as period_file,
- ):
- cpu_quota = float(quota_file.read()) / float(period_file.read())
- except Exception:
- logger.exception("Unexpected error calculating docker cpu quota.")
- # Look at cpu.max for cgroups v2
- elif os.path.exists(cpu_max_file_name):
- try:
- max_file = open(cpu_max_file_name).read()
- quota_str, period_str = max_file.split()
- if quota_str.isnumeric() and period_str.isnumeric():
- cpu_quota = float(quota_str) / float(period_str)
- else:
- # quota_str is "max" meaning the cpu quota is unset
- cpu_quota = None
- except Exception:
- logger.exception("Unexpected error calculating docker cpu quota.")
- if (cpu_quota is not None) and (cpu_quota < 0):
- cpu_quota = None
- elif cpu_quota == 0:
- # Round up in case the cpu limit is less than 1.
- cpu_quota = 1
- cpuset_num = None
- if os.path.exists(cpuset_file_name):
- try:
- with open(cpuset_file_name) as cpuset_file:
- ranges_as_string = cpuset_file.read()
- ranges = ranges_as_string.split(",")
- cpu_ids = []
- for num_or_range in ranges:
- if "-" in num_or_range:
- start, end = num_or_range.split("-")
- cpu_ids.extend(list(range(int(start), int(end) + 1)))
- else:
- cpu_ids.append(int(num_or_range))
- cpuset_num = len(cpu_ids)
- except Exception:
- logger.exception("Unexpected error calculating docker cpuset ids.")
- # Possible to-do: Parse cgroups v2's cpuset.cpus.effective for the number
- # of accessible CPUs.
- if cpu_quota and cpuset_num:
- return min(cpu_quota, cpuset_num)
- return cpu_quota or cpuset_num
- def get_num_cpus(
- override_docker_cpu_warning: bool = ENV_DISABLE_DOCKER_CPU_WARNING,
- truncate: bool = True,
- ) -> float:
- """
- Get the number of CPUs available on this node.
- Depending on the situation, use multiprocessing.cpu_count() or cgroups.
- Args:
- override_docker_cpu_warning: An extra flag to explicitly turn off the Docker
- warning. Setting this flag True has the same effect as setting the env
- RAY_DISABLE_DOCKER_CPU_WARNING. By default, whether or not to log
- the warning is determined by the env variable
- RAY_DISABLE_DOCKER_CPU_WARNING.
- truncate: truncates the return value and drops the decimal part.
- """
- cpu_count = multiprocessing.cpu_count()
- if os.environ.get("RAY_USE_MULTIPROCESSING_CPU_COUNT"):
- logger.info(
- "Detected RAY_USE_MULTIPROCESSING_CPU_COUNT=1: Using "
- "multiprocessing.cpu_count() to detect the number of CPUs. "
- "This may be inconsistent when used inside docker. "
- "To correctly detect CPUs, unset the env var: "
- "`RAY_USE_MULTIPROCESSING_CPU_COUNT`."
- )
- return cpu_count
- try:
- # Not easy to get cpu count in docker, see:
- # https://bugs.python.org/issue36054
- docker_count = _get_docker_cpus()
- if docker_count is not None and docker_count != cpu_count:
- # Don't log this warning if we're on K8s or if the warning is
- # explicitly disabled.
- if (
- "KUBERNETES_SERVICE_HOST" not in os.environ
- and not ENV_DISABLE_DOCKER_CPU_WARNING
- and not override_docker_cpu_warning
- ):
- logger.warning(
- "Detecting docker specified CPUs. In "
- "previous versions of Ray, CPU detection in containers "
- "was incorrect. Please ensure that Ray has enough CPUs "
- "allocated. As a temporary workaround to revert to the "
- "prior behavior, set "
- "`RAY_USE_MULTIPROCESSING_CPU_COUNT=1` as an env var "
- "before starting Ray. Set the env var: "
- "`RAY_DISABLE_DOCKER_CPU_WARNING=1` to mute this warning."
- )
- # TODO (Alex): We should probably add support for fractional cpus.
- if int(docker_count) != float(docker_count):
- logger.warning(
- f"Ray currently does not support initializing Ray "
- f"with fractional cpus. Your num_cpus will be "
- f"truncated from {docker_count} to "
- f"{int(docker_count)}."
- )
- if truncate:
- docker_count = int(docker_count)
- cpu_count = docker_count
- except Exception:
- # `nproc` and cgroup are linux-only. If docker only works on linux
- # (will run in a linux VM on other platforms), so this is fine.
- pass
- return cpu_count
- # TODO(clarng): merge code with c++
- def get_cgroup_used_memory(
- memory_stat_filename: str,
- memory_usage_filename: str,
- inactive_file_key: str,
- active_file_key: str,
- ):
- """
- The calculation logic is the same with `GetCGroupMemoryUsedBytes`
- in `memory_monitor.cc` file.
- """
- inactive_file_bytes = -1
- active_file_bytes = -1
- with open(memory_stat_filename, "r") as f:
- lines = f.readlines()
- for line in lines:
- if f"{inactive_file_key} " in line:
- inactive_file_bytes = int(line.split()[1])
- elif f"{active_file_key} " in line:
- active_file_bytes = int(line.split()[1])
- with open(memory_usage_filename, "r") as f:
- lines = f.readlines()
- cgroup_usage_in_bytes = int(lines[0].strip())
- if (
- inactive_file_bytes == -1
- or cgroup_usage_in_bytes == -1
- or active_file_bytes == -1
- ):
- return None
- return cgroup_usage_in_bytes - inactive_file_bytes - active_file_bytes
- def resolve_object_store_memory(
- available_memory_bytes: int,
- object_store_memory: Optional[int] = None,
- ) -> int:
- """Resolve the object store memory size.
- This function determines the appropriate object store memory size based on
- the provided value or calculates a default based on available system memory.
- Args:
- available_memory_bytes: The memory available for this node.
- object_store_memory: The user-specified object store memory size in bytes.
- If None, a default size will be calculated.
- Returns:
- The resolved object store memory size in bytes.
- """
- # Derive default object store memory if not specified
- if object_store_memory is None:
- object_store_memory_cap = ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES
- # Cap by shm size by default to avoid low performance, but don't
- # go lower than REQUIRE_SHM_SIZE_THRESHOLD.
- if sys.platform == "linux" or sys.platform == "linux2":
- # Multiple by 0.95 to give a bit of wiggle-room.
- # https://github.com/ray-project/ray/pull/23034/files
- shm_avail = get_shared_memory_bytes() * 0.95
- shm_cap = max(ray_constants.REQUIRE_SHM_SIZE_THRESHOLD, shm_avail)
- object_store_memory_cap = min(object_store_memory_cap, shm_cap)
- object_store_memory = int(
- available_memory_bytes
- * ray_constants.DEFAULT_OBJECT_STORE_MEMORY_PROPORTION
- )
- # Set the object_store_memory size to 2GB on Mac
- # to avoid degraded performance.
- # (https://github.com/ray-project/ray/issues/20388)
- if sys.platform == "darwin":
- object_store_memory = min(
- object_store_memory, ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT
- )
- # Cap memory to avoid memory waste and perf issues on large nodes
- if object_store_memory > object_store_memory_cap:
- logger.debug(
- "Warning: Capping object memory store to {}GB. ".format(
- object_store_memory_cap // 1e9
- )
- + "To increase this further, specify `object_store_memory` "
- "when calling ray.init() or ray start."
- )
- object_store_memory = object_store_memory_cap
- return object_store_memory
- def get_used_memory():
- """Return the currently used system memory in bytes
- Returns:
- The total amount of used memory
- """
- # Try to accurately figure out the memory usage if we are in a docker
- # container.
- docker_usage = None
- # For cgroups v1:
- memory_usage_filename_v1 = "/sys/fs/cgroup/memory/memory.usage_in_bytes"
- memory_stat_filename_v1 = "/sys/fs/cgroup/memory/memory.stat"
- # For cgroups v2:
- memory_usage_filename_v2 = "/sys/fs/cgroup/memory.current"
- memory_stat_filename_v2 = "/sys/fs/cgroup/memory.stat"
- if os.path.exists(memory_usage_filename_v1) and os.path.exists(
- memory_stat_filename_v1
- ):
- docker_usage = get_cgroup_used_memory(
- memory_stat_filename_v1,
- memory_usage_filename_v1,
- "total_inactive_file",
- "total_active_file",
- )
- elif os.path.exists(memory_usage_filename_v2) and os.path.exists(
- memory_stat_filename_v2
- ):
- docker_usage = get_cgroup_used_memory(
- memory_stat_filename_v2,
- memory_usage_filename_v2,
- "inactive_file",
- "active_file",
- )
- if docker_usage is not None:
- return docker_usage
- return psutil.virtual_memory().used
- def estimate_available_memory():
- """Return the currently available amount of system memory in bytes.
- Returns:
- The total amount of available memory in bytes. Based on the used
- and total memory.
- """
- return get_system_memory() - get_used_memory()
- def get_shared_memory_bytes():
- """Get the size of the shared memory file system.
- Returns:
- The size of the shared memory file system in bytes.
- """
- # Make sure this is only called on Linux.
- assert sys.platform == "linux" or sys.platform == "linux2"
- shm_fd = os.open("/dev/shm", os.O_RDONLY)
- try:
- shm_fs_stats = os.fstatvfs(shm_fd)
- # The value shm_fs_stats.f_bsize is the block size and the
- # value shm_fs_stats.f_bavail is the number of available
- # blocks.
- shm_avail = shm_fs_stats.f_bsize * shm_fs_stats.f_bavail
- finally:
- os.close(shm_fd)
- return shm_avail
- def check_oversized_function(
- pickled: bytes, name: str, obj_type: str, worker: "ray.Worker"
- ) -> None:
- """Send a warning message if the pickled function is too large.
- Args:
- pickled: the pickled function.
- name: name of the pickled object.
- obj_type: type of the pickled object, can be 'function',
- 'remote function', or 'actor'.
- worker: the worker used to send warning message. message will be logged
- locally if None.
- """
- length = len(pickled)
- if length <= ray_constants.FUNCTION_SIZE_WARN_THRESHOLD:
- return
- elif length < ray_constants.FUNCTION_SIZE_ERROR_THRESHOLD:
- warning_message = (
- "The {} {} is very large ({} MiB). "
- "Check that its definition is not implicitly capturing a large "
- "array or other object in scope. Tip: use ray.put() to put large "
- "objects in the Ray object store."
- ).format(obj_type, name, length // (1024 * 1024))
- if worker:
- push_error_to_driver(
- worker,
- ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR,
- "Warning: " + warning_message,
- job_id=worker.current_job_id,
- )
- else:
- error = (
- "The {} {} is too large ({} MiB > FUNCTION_SIZE_ERROR_THRESHOLD={}"
- " MiB). Check that its definition is not implicitly capturing a "
- "large array or other object in scope. Tip: use ray.put() to "
- "put large objects in the Ray object store."
- ).format(
- obj_type,
- name,
- length // (1024 * 1024),
- ray_constants.FUNCTION_SIZE_ERROR_THRESHOLD // (1024 * 1024),
- )
- raise ValueError(error)
- def is_main_thread():
- return threading.current_thread().getName() == "MainThread"
- def detect_fate_sharing_support_win32():
- global win32_job, win32_AssignProcessToJobObject
- if win32_job is None and sys.platform == "win32":
- import ctypes
- try:
- from ctypes.wintypes import BOOL, DWORD, HANDLE, LPCWSTR, LPVOID
- kernel32 = ctypes.WinDLL("kernel32")
- kernel32.CreateJobObjectW.argtypes = (LPVOID, LPCWSTR)
- kernel32.CreateJobObjectW.restype = HANDLE
- sijo_argtypes = (HANDLE, ctypes.c_int, LPVOID, DWORD)
- kernel32.SetInformationJobObject.argtypes = sijo_argtypes
- kernel32.SetInformationJobObject.restype = BOOL
- kernel32.AssignProcessToJobObject.argtypes = (HANDLE, HANDLE)
- kernel32.AssignProcessToJobObject.restype = BOOL
- kernel32.IsDebuggerPresent.argtypes = ()
- kernel32.IsDebuggerPresent.restype = BOOL
- except (AttributeError, TypeError, ImportError):
- kernel32 = None
- job = kernel32.CreateJobObjectW(None, None) if kernel32 else None
- job = subprocess.Handle(job) if job else job
- if job:
- from ctypes.wintypes import DWORD, LARGE_INTEGER, ULARGE_INTEGER
- class JOBOBJECT_BASIC_LIMIT_INFORMATION(ctypes.Structure):
- _fields_ = [
- ("PerProcessUserTimeLimit", LARGE_INTEGER),
- ("PerJobUserTimeLimit", LARGE_INTEGER),
- ("LimitFlags", DWORD),
- ("MinimumWorkingSetSize", ctypes.c_size_t),
- ("MaximumWorkingSetSize", ctypes.c_size_t),
- ("ActiveProcessLimit", DWORD),
- ("Affinity", ctypes.c_size_t),
- ("PriorityClass", DWORD),
- ("SchedulingClass", DWORD),
- ]
- class IO_COUNTERS(ctypes.Structure):
- _fields_ = [
- ("ReadOperationCount", ULARGE_INTEGER),
- ("WriteOperationCount", ULARGE_INTEGER),
- ("OtherOperationCount", ULARGE_INTEGER),
- ("ReadTransferCount", ULARGE_INTEGER),
- ("WriteTransferCount", ULARGE_INTEGER),
- ("OtherTransferCount", ULARGE_INTEGER),
- ]
- class JOBOBJECT_EXTENDED_LIMIT_INFORMATION(ctypes.Structure):
- _fields_ = [
- ("BasicLimitInformation", JOBOBJECT_BASIC_LIMIT_INFORMATION),
- ("IoInfo", IO_COUNTERS),
- ("ProcessMemoryLimit", ctypes.c_size_t),
- ("JobMemoryLimit", ctypes.c_size_t),
- ("PeakProcessMemoryUsed", ctypes.c_size_t),
- ("PeakJobMemoryUsed", ctypes.c_size_t),
- ]
- debug = kernel32.IsDebuggerPresent()
- # Defined in <WinNT.h>; also available here:
- # https://docs.microsoft.com/en-us/windows/win32/api/jobapi2/nf-jobapi2-setinformationjobobject
- JobObjectExtendedLimitInformation = 9
- JOB_OBJECT_LIMIT_BREAKAWAY_OK = 0x00000800
- JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION = 0x00000400
- JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE = 0x00002000
- buf = JOBOBJECT_EXTENDED_LIMIT_INFORMATION()
- buf.BasicLimitInformation.LimitFlags = (
- (0 if debug else JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE)
- | JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION
- | JOB_OBJECT_LIMIT_BREAKAWAY_OK
- )
- infoclass = JobObjectExtendedLimitInformation
- if not kernel32.SetInformationJobObject(
- job, infoclass, ctypes.byref(buf), ctypes.sizeof(buf)
- ):
- job = None
- win32_AssignProcessToJobObject = (
- kernel32.AssignProcessToJobObject if kernel32 is not None else False
- )
- win32_job = job if job else False
- return bool(win32_job)
- def detect_fate_sharing_support_linux():
- global linux_prctl
- if linux_prctl is None and sys.platform.startswith("linux"):
- try:
- from ctypes import CDLL, c_int, c_ulong
- prctl = CDLL(None).prctl
- prctl.restype = c_int
- prctl.argtypes = [c_int, c_ulong, c_ulong, c_ulong, c_ulong]
- except (AttributeError, TypeError):
- prctl = None
- linux_prctl = prctl if prctl else False
- return bool(linux_prctl)
- def detect_fate_sharing_support():
- result = None
- if sys.platform == "win32":
- result = detect_fate_sharing_support_win32()
- elif sys.platform.startswith("linux"):
- result = detect_fate_sharing_support_linux()
- return result
- def set_kill_on_parent_death_linux():
- """Ensures this process dies if its parent dies (fate-sharing).
- Linux-only. Must be called in preexec_fn (i.e. by the child).
- """
- if detect_fate_sharing_support_linux():
- import signal
- PR_SET_PDEATHSIG = 1
- if linux_prctl(PR_SET_PDEATHSIG, signal.SIGKILL, 0, 0, 0) != 0:
- import ctypes
- raise OSError(ctypes.get_errno(), "prctl(PR_SET_PDEATHSIG) failed")
- else:
- assert False, "PR_SET_PDEATHSIG used despite being unavailable"
- def set_kill_child_on_death_win32(child_proc):
- """Ensures the child process dies if this process dies (fate-sharing).
- Windows-only. Must be called by the parent, after spawning the child.
- Args:
- child_proc: The subprocess.Popen or subprocess.Handle object.
- """
- if isinstance(child_proc, subprocess.Popen):
- child_proc = child_proc._handle
- assert isinstance(child_proc, subprocess.Handle)
- if detect_fate_sharing_support_win32():
- if not win32_AssignProcessToJobObject(win32_job, int(child_proc)):
- import ctypes
- raise OSError(ctypes.get_last_error(), "AssignProcessToJobObject() failed")
- else:
- assert False, "AssignProcessToJobObject used despite being unavailable"
- def set_sigterm_handler(sigterm_handler):
- """Registers a handler for SIGTERM in a platform-compatible manner."""
- if sys.platform == "win32":
- # Note that these signal handlers only work for console applications.
- # TODO(mehrdadn): implement graceful process termination mechanism
- # SIGINT is Ctrl+C, SIGBREAK is Ctrl+Break.
- signal.signal(signal.SIGBREAK, sigterm_handler)
- else:
- signal.signal(signal.SIGTERM, sigterm_handler)
- def try_to_symlink(symlink_path, target_path):
- """Attempt to create a symlink.
- If the symlink path exists and isn't a symlink, the symlink will not be
- created. If a symlink exists in the path, it will be attempted to be
- removed and replaced.
- Args:
- symlink_path: The path at which to create the symlink.
- target_path: The path the symlink should point to.
- """
- symlink_path = os.path.expanduser(symlink_path)
- target_path = os.path.expanduser(target_path)
- if os.path.exists(symlink_path):
- if os.path.islink(symlink_path):
- # Try to remove existing symlink.
- try:
- os.remove(symlink_path)
- except OSError:
- return
- else:
- # There's an existing non-symlink file, don't overwrite it.
- return
- try:
- os.symlink(target_path, symlink_path)
- except OSError:
- return
- def get_user():
- if pwd is None:
- return ""
- try:
- return pwd.getpwuid(os.getuid()).pw_name
- except Exception:
- return ""
- def get_conda_bin_executable(executable_name):
- """
- Return path to the specified executable, assumed to be discoverable within
- the 'bin' subdirectory of a conda installation. Adapted from
- https://github.com/mlflow/mlflow.
- """
- # Use CONDA_EXE as per https://github.com/conda/conda/issues/7126
- if "CONDA_EXE" in os.environ:
- conda_bin_dir = os.path.dirname(os.environ["CONDA_EXE"])
- return os.path.join(conda_bin_dir, executable_name)
- return executable_name
- def get_conda_env_dir(env_name):
- """Find and validate the conda directory for a given conda environment.
- For example, given the environment name `tf1`, this function checks
- the existence of the corresponding conda directory, e.g.
- `/Users/scaly/anaconda3/envs/tf1`, and returns it.
- """
- conda_prefix = os.environ.get("CONDA_PREFIX")
- if conda_prefix is None:
- # The caller is neither in a conda env or in (base) env. This is rare
- # because by default, new terminals start in (base), but we can still
- # support this case.
- conda_exe = os.environ.get("CONDA_EXE")
- if conda_exe is None:
- raise ValueError(
- "Cannot find environment variables set by conda. "
- "Please verify conda is installed."
- )
- # Example: CONDA_EXE=$HOME/anaconda3/bin/python
- # Strip out /bin/python by going up two parent directories.
- conda_prefix = str(Path(conda_exe).parent.parent)
- # There are two cases:
- # 1. We are in a conda (base) env: CONDA_DEFAULT_ENV=base and
- # CONDA_PREFIX=$HOME/anaconda3
- # 2. We are in a user-created conda env: CONDA_DEFAULT_ENV=$env_name and
- # CONDA_PREFIX=$HOME/anaconda3/envs/$current_env_name
- if os.environ.get("CONDA_DEFAULT_ENV") == "base":
- # Caller's curent environment is (base).
- # Not recommended by conda, but we can still support it.
- if env_name == "base":
- # Desired environment is (base), located at e.g. $HOME/anaconda3
- env_dir = conda_prefix
- else:
- # Desired environment is user-created, e.g.
- # $HOME/anaconda3/envs/$env_name
- env_dir = os.path.join(conda_prefix, "envs", env_name)
- else:
- # Now `conda_prefix` should be something like
- # $HOME/anaconda3/envs/$current_env_name
- # We want to replace the last component with the desired env name.
- conda_envs_dir = os.path.split(conda_prefix)[0]
- env_dir = os.path.join(conda_envs_dir, env_name)
- if not os.path.isdir(env_dir):
- raise ValueError(
- "conda env "
- + env_name
- + " not found in conda envs directory. Run `conda env list` to "
- + "verify the name is correct."
- )
- return env_dir
- def get_ray_doc_version():
- """Get the docs.ray.io version corresponding to the ray.__version__."""
- # The ray.__version__ can be official Ray release (such as 1.12.0), or
- # dev (3.0.0dev0) or release candidate (2.0.0rc0). For the later we map
- # to the master doc version at docs.ray.io.
- if re.match(r"^\d+\.\d+\.\d+$", ray.__version__) is None:
- return "master"
- # For the former (official Ray release), we have corresponding doc version
- # released as well.
- return f"releases-{ray.__version__}"
- # Used to only print a deprecation warning once for a given function if we
- # don't wish to spam the caller.
- def get_wheel_filename(
- sys_platform: str = sys.platform,
- ray_version: str = ray.__version__,
- py_version: Tuple[int, int] = (sys.version_info.major, sys.version_info.minor),
- architecture: Optional[str] = None,
- ) -> str:
- """Returns the filename used for the nightly Ray wheel.
- Args:
- sys_platform: The platform as returned by sys.platform. Examples:
- "darwin", "linux", "win32"
- ray_version: The Ray version as returned by ray.__version__ or
- `ray --version`. Examples: "3.0.0.dev0"
- py_version: The Python version as returned by sys.version_info. A
- tuple of (major, minor). Examples: (3, 8)
- architecture: Architecture, e.g. ``x86_64`` or ``aarch64``. If None, will
- be determined by calling ``platform.processor()``.
- Returns:
- The wheel file name. Examples:
- ray-3.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl
- """
- assert py_version in ray_constants.RUNTIME_ENV_CONDA_PY_VERSIONS, py_version
- py_version_str = "".join(map(str, py_version))
- architecture = architecture or platform.processor()
- assert sys_platform in ["darwin", "linux", "win32"], sys_platform
- if sys_platform == "darwin":
- if architecture == "x86_64":
- os_string = "macosx_12_0_x86_64"
- else:
- os_string = "macosx_12_0_arm64"
- elif sys_platform == "linux":
- if architecture == "aarch64" or architecture == "arm64":
- os_string = "manylinux2014_aarch64"
- else:
- os_string = "manylinux2014_x86_64"
- elif sys_platform == "win32":
- os_string = "win_amd64"
- wheel_filename = (
- f"ray-{ray_version}-cp{py_version_str}-"
- f"cp{py_version_str}{'m' if py_version_str in ['37'] else ''}"
- f"-{os_string}.whl"
- )
- return wheel_filename
- def get_master_wheel_url(
- ray_commit: str = ray.__commit__,
- sys_platform: str = sys.platform,
- ray_version: str = ray.__version__,
- py_version: Tuple[int, int] = sys.version_info[:2],
- ) -> str:
- """Return the URL for the wheel from a specific commit."""
- filename = get_wheel_filename(
- sys_platform=sys_platform, ray_version=ray_version, py_version=py_version
- )
- return (
- f"https://s3-us-west-2.amazonaws.com/ray-wheels/master/"
- f"{ray_commit}/{filename}"
- )
- def get_release_wheel_url(
- ray_commit: str = ray.__commit__,
- sys_platform: str = sys.platform,
- ray_version: str = ray.__version__,
- py_version: Tuple[int, int] = sys.version_info[:2],
- ) -> str:
- """Return the URL for the wheel for a specific release."""
- filename = get_wheel_filename(
- sys_platform=sys_platform, ray_version=ray_version, py_version=py_version
- )
- return (
- f"https://ray-wheels.s3-us-west-2.amazonaws.com/releases/"
- f"{ray_version}/{ray_commit}/{filename}"
- )
- # e.g. https://ray-wheels.s3-us-west-2.amazonaws.com/releases/1.4.0rc1/e7c7
- # f6371a69eb727fa469e4cd6f4fbefd143b4c/ray-1.4.0rc1-cp36-cp36m-manylinux201
- # 4_x86_64.whl
- def validate_namespace(namespace: str):
- if not isinstance(namespace, str):
- raise TypeError("namespace must be None or a string.")
- elif namespace == "":
- raise ValueError(
- '"" is not a valid namespace. ' "Pass None to not specify a namespace."
- )
- def get_dashboard_dependency_error() -> Optional[ImportError]:
- """Returns the exception error if Ray Dashboard dependencies are not installed.
- None if they are installed.
- Checks to see if we should start the dashboard agent or not based on the
- Ray installation version the user has installed (ray vs. ray[default]).
- Unfortunately there doesn't seem to be a cleaner way to detect this other
- than just blindly importing the relevant packages.
- """
- try:
- import ray.dashboard.optional_deps # noqa: F401
- return None
- except ImportError as e:
- return e
- def get_ray_client_dependency_error() -> Optional[ImportError]:
- """Returns the exception error if Ray Client dependencies are not installed.
- None if they are installed.
- """
- try:
- import grpc # noqa: F401
- return None
- except ImportError as e:
- return e
- connect_error = (
- "Unable to connect to GCS (ray head) at {}. "
- "Check that (1) Ray with matching version started "
- "successfully at the specified address, (2) this "
- "node can reach the specified address, and (3) there is "
- "no firewall setting preventing access."
- )
- def internal_kv_list_with_retry(gcs_client, prefix, namespace, num_retries=20):
- result = None
- if isinstance(prefix, str):
- prefix = prefix.encode()
- if isinstance(namespace, str):
- namespace = namespace.encode()
- for _ in range(num_retries):
- try:
- result = gcs_client.internal_kv_keys(prefix, namespace)
- except Exception as e:
- if isinstance(e, ray.exceptions.RpcError) and e.rpc_code in (
- ray._raylet.GRPC_STATUS_CODE_UNAVAILABLE,
- ray._raylet.GRPC_STATUS_CODE_UNKNOWN,
- ):
- logger.warning(connect_error.format(gcs_client.address))
- else:
- logger.exception("Internal KV List failed")
- result = None
- if result is not None:
- break
- else:
- logger.debug(f"Fetched {prefix}=None from KV. Retrying.")
- time.sleep(2)
- if result is None:
- raise ConnectionError(
- f"Could not list '{prefix}' from GCS. Did GCS start successfully?"
- )
- return result
- def internal_kv_get_with_retry(gcs_client, key, namespace, num_retries=20):
- result = None
- if isinstance(key, str):
- key = key.encode()
- for _ in range(num_retries):
- try:
- result = gcs_client.internal_kv_get(key, namespace)
- except Exception as e:
- if isinstance(e, ray.exceptions.RpcError) and e.rpc_code in (
- ray._raylet.GRPC_STATUS_CODE_UNAVAILABLE,
- ray._raylet.GRPC_STATUS_CODE_UNKNOWN,
- ):
- logger.warning(connect_error.format(gcs_client.address))
- else:
- logger.exception("Internal KV Get failed")
- result = None
- if result is not None:
- break
- else:
- logger.debug(f"Fetched {key}=None from KV. Retrying.")
- time.sleep(2)
- if not result:
- raise ConnectionError(
- f"Could not read '{key.decode()}' from GCS. Did GCS start successfully?"
- )
- return result
- def parse_resources_json(
- resources: str, cli_logger, cf, command_arg="--resources"
- ) -> Dict[str, float]:
- try:
- resources = json.loads(resources)
- if not isinstance(resources, dict):
- raise ValueError("The format after deserialization is not a dict")
- except Exception as e:
- cli_logger.error(
- "`{}` is not a valid JSON string, detail error:{}",
- cf.bold(f"{command_arg}={resources}"),
- str(e),
- )
- cli_logger.abort(
- "Valid values look like this: `{}`",
- cf.bold(
- f'{command_arg}=\'{{"CustomResource3": 1, "CustomResource2": 2}}\''
- ),
- )
- return resources
- def parse_metadata_json(
- metadata: str, cli_logger, cf, command_arg="--metadata-json"
- ) -> Dict[str, str]:
- try:
- metadata = json.loads(metadata)
- if not isinstance(metadata, dict):
- raise ValueError("The format after deserialization is not a dict")
- except Exception as e:
- cli_logger.error(
- "`{}` is not a valid JSON string, detail error:{}",
- cf.bold(f"{command_arg}={metadata}"),
- str(e),
- )
- cli_logger.abort(
- "Valid values look like this: `{}`",
- cf.bold(f'{command_arg}=\'{{"key1": "value1", "key2": "value2"}}\''),
- )
- return metadata
- def internal_kv_put_with_retry(gcs_client, key, value, namespace, num_retries=20):
- if isinstance(key, str):
- key = key.encode()
- if isinstance(value, str):
- value = value.encode()
- if isinstance(namespace, str):
- namespace = namespace.encode()
- error = None
- for _ in range(num_retries):
- try:
- return gcs_client.internal_kv_put(
- key, value, overwrite=True, namespace=namespace
- )
- except ray.exceptions.RpcError as e:
- if e.rpc_code in (
- ray._raylet.GRPC_STATUS_CODE_UNAVAILABLE,
- ray._raylet.GRPC_STATUS_CODE_UNKNOWN,
- ):
- logger.warning(connect_error.format(gcs_client.address))
- else:
- logger.exception("Internal KV Put failed")
- time.sleep(2)
- error = e
- # Reraise the last error.
- raise error
- def compute_version_info():
- """Compute the versions of Python, and Ray.
- Returns:
- A tuple containing the version information.
- """
- ray_version = ray.__version__
- python_version = ".".join(map(str, sys.version_info[:3]))
- return ray_version, python_version
- def get_directory_size_bytes(path: Union[str, Path] = ".") -> int:
- """Get the total size of a directory in bytes, including subdirectories."""
- total_size_bytes = 0
- for dirpath, dirnames, filenames in os.walk(path):
- for f in filenames:
- fp = os.path.join(dirpath, f)
- # skip if it is a symbolic link or a .pyc file
- if not os.path.islink(fp) and not f.endswith(".pyc"):
- total_size_bytes += os.path.getsize(fp)
- return total_size_bytes
- def check_version_info(
- cluster_metadata,
- this_process_address,
- raise_on_mismatch=True,
- python_version_match_level=None,
- ):
- """Check if the Python and Ray versions stored in GCS matches this process.
- Args:
- cluster_metadata: Ray cluster metadata from GCS.
- this_process_address: Informational only. The address of this process.
- e.g. "node address:port" or "Ray Client".
- raise_on_mismatch: Raise an exception on True, log a warning otherwise.
- python_version_match_level: "minor" or "patch". To which python version level we
- try to match. Note if "minor" and the patch is different, we will still log
- a warning. Default value is `RAY_DEFAULT_PYTHON_VERSION_MATCH_LEVEL` if it
- exists, otherwise "patch"
- Behavior:
- - We raise or log a warning, based on raise_on_mismatch, if:
- - Ray versions do not match; OR
- - Python (major, minor) versions do not match,
- if python_version_match_level == 'minor'; OR
- - Python (major, minor, patch) versions do not match,
- if python_version_match_level == 'patch'.
- - We also log a warning if:
- - Python (major, minor) versions match, AND
- - Python patch versions do not match, AND
- - python_version_match_level == 'minor' AND
- - raise_on_mismatch == False.
- Raises:
- Exception: An exception is raised if there is a version mismatch.
- """
- if python_version_match_level is None:
- python_version_match_level = os.environ.get(
- "RAY_DEFAULT_PYTHON_VERSION_MATCH_LEVEL", "patch"
- )
- cluster_version_info = (
- cluster_metadata["ray_version"],
- cluster_metadata["python_version"],
- )
- my_version_info = compute_version_info()
- # Calculate: ray_matches, python_matches, python_full_matches
- ray_matches = cluster_version_info[0] == my_version_info[0]
- python_full_matches = cluster_version_info[1] == my_version_info[1]
- if python_version_match_level == "patch":
- python_matches = cluster_version_info[1] == my_version_info[1]
- elif python_version_match_level == "minor":
- my_python_versions = my_version_info[1].split(".")
- cluster_python_versions = cluster_version_info[1].split(".")
- python_matches = my_python_versions[:2] == cluster_python_versions[:2]
- else:
- raise ValueError(
- f"Invalid python_version_match_level: {python_version_match_level}, "
- "want: 'minor' or 'patch'"
- )
- mismatch_msg = (
- "The cluster was started with:\n"
- f" Ray: {cluster_version_info[0]}\n"
- f" Python: {cluster_version_info[1]}\n"
- f"This process on {this_process_address} was started with:\n"
- f" Ray: {my_version_info[0]}\n"
- f" Python: {my_version_info[1]}\n"
- )
- if ray_matches and python_matches:
- if not python_full_matches:
- logger.warning(f"Python patch version mismatch: {mismatch_msg}")
- else:
- error_message = f"Version mismatch: {mismatch_msg}"
- if raise_on_mismatch:
- raise RuntimeError(error_message)
- else:
- logger.warning(error_message)
- def get_runtime_env_info(
- runtime_env: "RuntimeEnv",
- *,
- is_job_runtime_env: bool = False,
- serialize: bool = False,
- ):
- """Create runtime env info from runtime env.
- In the user interface, the argument `runtime_env` contains some fields
- which not contained in `ProtoRuntimeEnv` but in `ProtoRuntimeEnvInfo`,
- such as `eager_install`. This function will extract those fields from
- `RuntimeEnv` and create a new `ProtoRuntimeEnvInfo`, and serialize it
- into json format.
- """
- from ray.runtime_env import RuntimeEnvConfig
- proto_runtime_env_info = ProtoRuntimeEnvInfo()
- if runtime_env.working_dir_uri():
- proto_runtime_env_info.uris.working_dir_uri = runtime_env.working_dir_uri()
- if len(runtime_env.py_modules_uris()) > 0:
- proto_runtime_env_info.uris.py_modules_uris[:] = runtime_env.py_modules_uris()
- # TODO(Catch-Bull): overload `__setitem__` for `RuntimeEnv`, change the
- # runtime_env of all internal code from dict to RuntimeEnv.
- runtime_env_config = runtime_env.get("config")
- if runtime_env_config is None:
- runtime_env_config = RuntimeEnvConfig.default_config()
- else:
- runtime_env_config = RuntimeEnvConfig.parse_and_validate_runtime_env_config(
- runtime_env_config
- )
- proto_runtime_env_info.runtime_env_config.CopyFrom(
- runtime_env_config.build_proto_runtime_env_config()
- )
- # Normally, `RuntimeEnv` should guarantee the accuracy of field eager_install,
- # but so far, the internal code has not completely prohibited direct
- # modification of fields in RuntimeEnv, so we should check it for insurance.
- eager_install = (
- runtime_env_config.get("eager_install")
- if runtime_env_config is not None
- else None
- )
- if is_job_runtime_env or eager_install is not None:
- if eager_install is None:
- eager_install = True
- elif not isinstance(eager_install, bool):
- raise TypeError(
- f"eager_install must be a boolean. got {type(eager_install)}"
- )
- proto_runtime_env_info.runtime_env_config.eager_install = eager_install
- proto_runtime_env_info.serialized_runtime_env = runtime_env.serialize()
- if not serialize:
- return proto_runtime_env_info
- return json_format.MessageToJson(proto_runtime_env_info)
- def parse_runtime_env_for_task_or_actor(
- runtime_env: Optional[Union[Dict, "RuntimeEnv"]]
- ):
- from ray.runtime_env import RuntimeEnv
- from ray.runtime_env.runtime_env import _validate_no_local_paths
- # Parse local pip/conda config files here. If we instead did it in
- # .remote(), it would get run in the Ray Client server, which runs on
- # a remote node where the files aren't available.
- if runtime_env:
- if isinstance(runtime_env, dict):
- runtime_env = RuntimeEnv(**(runtime_env or {}))
- _validate_no_local_paths(runtime_env)
- return runtime_env
- raise TypeError(
- "runtime_env must be dict or RuntimeEnv, ",
- f"but got: {type(runtime_env)}",
- )
- else:
- # Keep the new_runtime_env as None. In .remote(), we need to know
- # if runtime_env is None to know whether or not to fall back to the
- # runtime_env specified in the @ray.remote decorator.
- return None
- def split_address(address: str) -> Tuple[str, str]:
- """Splits address into a module string (scheme) and an inner_address.
- We use a custom splitting function instead of urllib because
- PEP allows "underscores" in a module names, while URL schemes do not
- allow them.
- Args:
- address: The address to split.
- Returns:
- A tuple of (scheme, inner_address).
- Raises:
- ValueError: If the address does not contain '://'.
- Examples:
- >>> split_address("ray://my_cluster")
- ('ray', 'my_cluster')
- """
- if "://" not in address:
- raise ValueError("Address must contain '://'")
- module_string, inner_address = address.split("://", maxsplit=1)
- return (module_string, inner_address)
- def get_entrypoint_name():
- """Get the entrypoint of the current script."""
- prefix = ""
- try:
- curr = psutil.Process()
- # Prepend `interactive_shell` for interactive shell scripts.
- # https://stackoverflow.com/questions/2356399/tell-if-python-is-in-interactive-mode # noqa
- if hasattr(sys, "ps1"):
- prefix = "(interactive_shell) "
- return prefix + list2cmdline(curr.cmdline())
- except Exception:
- return "unknown"
- class DeferSigint(contextlib.AbstractContextManager):
- """Context manager that defers SIGINT signals until the context is left."""
- # This is used by Ray's task cancellation to defer cancellation interrupts during
- # problematic areas, e.g. task argument deserialization.
- def __init__(self):
- # Whether a SIGINT signal was received during the context.
- self.signal_received = False
- # The overridden SIGINT handler
- self.overridden_sigint_handler = None
- # The original signal method.
- self.orig_signal = None
- @classmethod
- def create_if_main_thread(cls) -> contextlib.AbstractContextManager:
- """Creates a DeferSigint context manager if running on the main thread,
- returns a no-op context manager otherwise.
- """
- if threading.current_thread() == threading.main_thread():
- return cls()
- else:
- return contextlib.nullcontext()
- def _set_signal_received(self, signum, frame):
- """SIGINT handler that defers the signal."""
- self.signal_received = True
- def _signal_monkey_patch(self, signum, handler):
- """Monkey patch for signal.signal that defers the setting of new signal
- handler after the DeferSigint context exits."""
- # Only handle it in the main thread because if setting a handler in a non-main
- # thread, signal.signal will raise an error because Python doesn't allow it.
- if (
- threading.current_thread() == threading.main_thread()
- and signum == signal.SIGINT
- ):
- orig_sigint_handler = self.overridden_sigint_handler
- self.overridden_sigint_handler = handler
- return orig_sigint_handler
- return self.orig_signal(signum, handler)
- def __enter__(self):
- # Save original SIGINT handler for later restoration.
- self.overridden_sigint_handler = signal.getsignal(signal.SIGINT)
- # Set SIGINT signal handler that defers the signal.
- signal.signal(signal.SIGINT, self._set_signal_received)
- # Monkey patch signal.signal to raise an error if a SIGINT handler is registered
- # within the context.
- self.orig_signal = signal.signal
- signal.signal = self._signal_monkey_patch
- return self
- def __exit__(self, exc_type, exc, exc_tb):
- assert self.overridden_sigint_handler is not None
- assert self.orig_signal is not None
- # Restore original signal.signal function.
- signal.signal = self.orig_signal
- # Restore overridden SIGINT handler.
- signal.signal(signal.SIGINT, self.overridden_sigint_handler)
- if exc_type is None and self.signal_received:
- # No exception raised in context, call the original SIGINT handler.
- # By default, this means raising KeyboardInterrupt.
- self.overridden_sigint_handler(signal.SIGINT, None)
- else:
- # If exception was raised in context, returning False will cause it to be
- # reraised.
- return False
- def try_import_each_module(module_names_to_import: List[str]) -> None:
- """
- Make a best-effort attempt to import each named Python module.
- This is used by the Python default_worker.py to preload modules.
- """
- for module_to_preload in module_names_to_import:
- try:
- importlib.import_module(module_to_preload)
- except ImportError:
- logger.exception(f'Failed to preload the module "{module_to_preload}"')
- def remove_ray_internal_flags_from_env(env: dict):
- """
- Remove Ray internal flags from `env`.
- Defined in ray/common/ray_internal_flag_def.h
- """
- for flag in ray_constants.RAY_INTERNAL_FLAGS:
- env.pop(flag, None)
- def update_envs(env_vars: Dict[str, str]):
- """
- When updating the environment variable, if there is ${X},
- it will be replaced with the current environment variable.
- """
- if not env_vars:
- return
- for key, value in env_vars.items():
- expanded = os.path.expandvars(value)
- # Replace non-existant env vars with an empty string.
- result = re.sub(r"\$\{[A-Z0-9_]+\}", "", expanded)
- os.environ[key] = result
- def parse_pg_formatted_resources_to_original(
- pg_formatted_resources: Dict[str, float]
- ) -> Dict[str, float]:
- original_resources = {}
- for key, value in pg_formatted_resources.items():
- result = PLACEMENT_GROUP_INDEXED_BUNDLED_RESOURCE_PATTERN.match(key)
- if result and len(result.groups()) == 3:
- # Filter out resources that have bundle_group_[pg_id] since
- # it is an implementation detail.
- # This resource is automatically added to the resource
- # request for all tasks that require placement groups.
- if result.group(1) == PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME:
- continue
- original_resources[result.group(1)] = value
- continue
- result = PLACEMENT_GROUP_WILDCARD_RESOURCE_PATTERN.match(key)
- if result and len(result.groups()) == 2:
- if result.group(1) == "bundle":
- continue
- original_resources[result.group(1)] = value
- continue
- original_resources[key] = value
- return original_resources
- def validate_actor_state_name(actor_state_name):
- if actor_state_name is None:
- return
- actor_state_names = [
- "DEPENDENCIES_UNREADY",
- "PENDING_CREATION",
- "ALIVE",
- "RESTARTING",
- "DEAD",
- ]
- if actor_state_name not in actor_state_names:
- raise ValueError(
- f'"{actor_state_name}" is not a valid actor state name, '
- 'it must be one of the following: "DEPENDENCIES_UNREADY", '
- '"PENDING_CREATION", "ALIVE", "RESTARTING", or "DEAD"'
- )
- def get_current_node_cpu_model_name() -> Optional[str]:
- if not sys.platform.startswith("linux"):
- return None
- try:
- """
- /proc/cpuinfo content example:
- processor : 0
- vendor_id : GenuineIntel
- cpu family : 6
- model : 85
- model name : Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
- stepping : 7
- """
- with open("/proc/cpuinfo", "r") as f:
- for line in f:
- if line.startswith("model name"):
- return line.split(":")[1].strip()
- return None
- except Exception:
- logger.debug("Failed to get CPU model name", exc_info=True)
- return None
- def validate_socket_filepath(filepath: str):
- """
- Validate the provided filename is a valid Unix socket filename.
- """
- # Don't check for Windows as it doesn't support Unix sockets.
- if sys.platform == "win32":
- return
- is_mac = sys.platform.startswith("darwin")
- maxlen = (104 if is_mac else 108) - 1
- if len(filepath.encode("utf-8")) > maxlen:
- raise OSError(
- f"validate_socket_filename failed: AF_UNIX path length cannot exceed {maxlen} bytes: {filepath}"
- )
- # Whether we're currently running in a test, either local or CI.
- in_test = None
- def is_in_test():
- global in_test
- if in_test is None:
- in_test = any(
- env_var in os.environ
- # These environment variables are always set by pytest and Buildkite,
- # respectively.
- for env_var in ("PYTEST_CURRENT_TEST", "BUILDKITE")
- )
- return in_test
|