| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016 |
- """This is the module that is in charge of Ray usage report (telemetry) APIs.
- NOTE: Ray's usage report is currently "on by default".
- One could opt-out, see details at https://docs.ray.io/en/master/cluster/usage-stats.html. # noqa
- Ray usage report follows the specification from
- https://docs.ray.io/en/master/cluster/usage-stats.html#usage-stats-collection # noqa
- # Module
- The module consists of 2 parts.
- ## Public API
- It contains public APIs to obtain usage report information.
- APIs will be added before the usage report becomes opt-in by default.
- ## Internal APIs for usage processing/report
- The telemetry report consists of 5 components. This module is in charge of the top 2 layers.
- Report -> usage_lib
- ---------------------
- Usage data processing -> usage_lib
- ---------------------
- Data storage -> Ray API server
- ---------------------
- Aggregation -> Ray API server (currently a dashboard server)
- ---------------------
- Usage data collection -> Various components (Ray agent, GCS, etc.) + usage_lib (cluster metadata).
- Usage report is currently "off by default". You can enable the report by setting an environment variable
- RAY_USAGE_STATS_ENABLED=1. For example, `RAY_USAGE_STATS_ENABLED=1 ray start --head`.
- Or `RAY_USAGE_STATS_ENABLED=1 python [drivers with ray.init()]`.
- "Ray API server (currently a dashboard server)" reports the usage data to https://usage-stats.ray.io/.
- Data is reported every hour by default.
- Note that it is also possible to configure the interval using the environment variable,
- `RAY_USAGE_STATS_REPORT_INTERVAL_S`.
- To see collected/reported data, see `usage_stats.json` inside a temp
- folder (e.g., /tmp/ray/session_[id]/*).
- """
- import json
- import logging
- import os
- import platform
- import sys
- import threading
- import time
- from dataclasses import asdict, dataclass
- from enum import Enum, auto
- from pathlib import Path
- from typing import Dict, List, Optional, Set
- import requests
- import yaml
- import ray
- import ray._common.usage.usage_constants as usage_constant
- import ray._private.ray_constants as ray_constants
- from ray._raylet import GcsClient
- from ray.core.generated import gcs_pb2, usage_pb2
- from ray.experimental.internal_kv import (
- _internal_kv_initialized,
- _internal_kv_put,
- )
- logger = logging.getLogger(__name__)
- TagKey = usage_pb2.TagKey
- #################
- # Internal APIs #
- #################
- @dataclass(init=True)
- class ClusterConfigToReport:
- cloud_provider: Optional[str] = None
- min_workers: Optional[int] = None
- max_workers: Optional[int] = None
- head_node_instance_type: Optional[str] = None
- worker_node_instance_types: Optional[List[str]] = None
- @dataclass(init=True)
- class ClusterStatusToReport:
- total_num_cpus: Optional[int] = None
- total_num_gpus: Optional[int] = None
- total_memory_gb: Optional[float] = None
- total_object_store_memory_gb: Optional[float] = None
- @dataclass(init=True)
- class UsageStatsToReport:
- """Usage stats to report"""
- #: The schema version of the report.
- schema_version: str
- #: The source of the data (i.e. OSS).
- source: str
- #: When the data is collected and reported.
- collect_timestamp_ms: int
- #: The total number of successful reports for the lifetime of the cluster.
- total_success: Optional[int] = None
- #: The total number of failed reports for the lifetime of the cluster.
- total_failed: Optional[int] = None
- #: The sequence number of the report.
- seq_number: Optional[int] = None
- #: The Ray version in use.
- ray_version: Optional[str] = None
- #: The Python version in use.
- python_version: Optional[str] = None
- #: A random id of the cluster session.
- session_id: Optional[str] = None
- #: The git commit hash of Ray (i.e. ray.__commit__).
- git_commit: Optional[str] = None
- #: The operating system in use.
- os: Optional[str] = None
- #: When the cluster is started.
- session_start_timestamp_ms: Optional[int] = None
- #: The cloud provider found in the cluster.yaml file (e.g., aws).
- cloud_provider: Optional[str] = None
- #: The min_workers found in the cluster.yaml file.
- min_workers: Optional[int] = None
- #: The max_workers found in the cluster.yaml file.
- max_workers: Optional[int] = None
- #: The head node instance type found in the cluster.yaml file (e.g., i3.8xlarge).
- head_node_instance_type: Optional[str] = None
- #: The worker node instance types found in the cluster.yaml file (e.g., i3.8xlarge).
- worker_node_instance_types: Optional[List[str]] = None
- #: The total num of cpus in the cluster.
- total_num_cpus: Optional[int] = None
- #: The total num of gpus in the cluster.
- total_num_gpus: Optional[int] = None
- #: The total size of memory in the cluster.
- total_memory_gb: Optional[float] = None
- #: The total size of object store memory in the cluster.
- total_object_store_memory_gb: Optional[float] = None
- #: The Ray libraries that are used (e.g., rllib).
- library_usages: Optional[List[str]] = None
- #: The extra tags to report when specified by an
- # environment variable RAY_USAGE_STATS_EXTRA_TAGS
- extra_usage_tags: Optional[Dict[str, str]] = None
- #: The number of alive nodes when the report is generated.
- total_num_nodes: Optional[int] = None
- #: The total number of running jobs excluding internal ones
- # when the report is generated.
- total_num_running_jobs: Optional[int] = None
- #: The libc version in the OS.
- libc_version: Optional[str] = None
- #: The hardwares that are used (e.g. Intel Xeon).
- hardware_usages: Optional[List[str]] = None
- @dataclass(init=True)
- class UsageStatsToWrite:
- """Usage stats to write to `USAGE_STATS_FILE`
- We are writing extra metadata such as the status of report
- to this file.
- """
- usage_stats: UsageStatsToReport
- # Whether or not the last report succeeded.
- success: bool
- # The error message of the last report if it happens.
- error: str
- class UsageStatsEnabledness(Enum):
- ENABLED_EXPLICITLY = auto()
- DISABLED_EXPLICITLY = auto()
- ENABLED_BY_DEFAULT = auto()
- _recorded_library_usages = set()
- _recorded_library_usages_lock = threading.Lock()
- _recorded_extra_usage_tags = dict()
- _recorded_extra_usage_tags_lock = threading.Lock()
- def _add_to_usage_set(set_name: str, value: str):
- assert _internal_kv_initialized()
- try:
- _internal_kv_put(
- f"{set_name}{value}".encode(),
- b"",
- namespace=usage_constant.USAGE_STATS_NAMESPACE.encode(),
- )
- except Exception as e:
- logger.debug(f"Failed to add {value} to usage set {set_name}, {e}")
- def _get_usage_set(gcs_client, set_name: str) -> Set[str]:
- try:
- result = set()
- usages = gcs_client.internal_kv_keys(
- set_name.encode(),
- namespace=usage_constant.USAGE_STATS_NAMESPACE.encode(),
- )
- for usage in usages:
- usage = usage.decode("utf-8")
- result.add(usage[len(set_name) :])
- return result
- except Exception as e:
- logger.debug(f"Failed to get usage set {set_name}, {e}")
- return set()
- def _put_library_usage(library_usage: str):
- _add_to_usage_set(usage_constant.LIBRARY_USAGE_SET_NAME, library_usage)
- def _put_hardware_usage(hardware_usage: str):
- _add_to_usage_set(usage_constant.HARDWARE_USAGE_SET_NAME, hardware_usage)
- def record_extra_usage_tag(
- key: TagKey, value: str, gcs_client: Optional[GcsClient] = None
- ):
- """Record extra kv usage tag.
- If the key already exists, the value will be overwritten.
- To record an extra tag, first add the key to the TagKey enum and
- then call this function.
- It will make a synchronous call to the internal kv store if the tag is updated.
- Args:
- key: The key of the tag.
- value: The value of the tag.
- gcs_client: The GCS client to perform KV operation PUT. Defaults to None.
- When None, it will try to get the global client from the internal_kv.
- Returns:
- None
- """
- key = TagKey.Name(key).lower()
- with _recorded_extra_usage_tags_lock:
- if _recorded_extra_usage_tags.get(key) == value:
- return
- _recorded_extra_usage_tags[key] = value
- if not _internal_kv_initialized() and gcs_client is None:
- # This happens if the record is before ray.init and
- # no GCS client is used for recording explicitly.
- return
- _put_extra_usage_tag(key, value, gcs_client)
- def _put_extra_usage_tag(key: str, value: str, gcs_client: Optional[GcsClient] = None):
- try:
- key = f"{usage_constant.EXTRA_USAGE_TAG_PREFIX}{key}".encode()
- val = value.encode()
- namespace = usage_constant.USAGE_STATS_NAMESPACE.encode()
- if gcs_client is not None:
- # Use the GCS client.
- gcs_client.internal_kv_put(key, val, namespace=namespace)
- else:
- # Use internal kv.
- assert _internal_kv_initialized()
- _internal_kv_put(key, val, namespace=namespace)
- except Exception as e:
- logger.debug(f"Failed to put extra usage tag, {e}")
- def record_hardware_usage(hardware_usage: str):
- """Record hardware usage (e.g. which CPU model is used)"""
- assert _internal_kv_initialized()
- _put_hardware_usage(hardware_usage)
- def record_library_usage(library_usage: str):
- """Record library usage (e.g. which library is used)"""
- with _recorded_library_usages_lock:
- if library_usage in _recorded_library_usages:
- return
- _recorded_library_usages.add(library_usage)
- if not _internal_kv_initialized():
- # This happens if the library is imported before ray.init
- return
- # Only report lib usage for driver / ray client / workers. Otherwise,
- # it can be reported if the library is imported from
- # e.g., API server.
- if (
- ray._private.worker.global_worker.mode == ray.SCRIPT_MODE
- or ray._private.worker.global_worker.mode == ray.WORKER_MODE
- or ray.util.client.ray.is_connected()
- ):
- _put_library_usage(library_usage)
- def _put_pre_init_library_usages():
- assert _internal_kv_initialized()
- # NOTE: When the lib is imported from a worker, ray should
- # always be initialized, so there's no need to register the
- # pre init hook.
- if not (
- ray._private.worker.global_worker.mode == ray.SCRIPT_MODE
- or ray.util.client.ray.is_connected()
- ):
- return
- for library_usage in _recorded_library_usages:
- _put_library_usage(library_usage)
- def _put_pre_init_extra_usage_tags():
- assert _internal_kv_initialized()
- for k, v in _recorded_extra_usage_tags.items():
- _put_extra_usage_tag(k, v)
- def put_pre_init_usage_stats():
- _put_pre_init_library_usages()
- _put_pre_init_extra_usage_tags()
- def reset_global_state():
- global _recorded_library_usages, _recorded_extra_usage_tags
- with _recorded_library_usages_lock:
- _recorded_library_usages = set()
- with _recorded_extra_usage_tags_lock:
- _recorded_extra_usage_tags = dict()
- ray._private.worker._post_init_hooks.append(put_pre_init_usage_stats)
- def _usage_stats_report_url():
- # The usage collection server URL.
- # The environment variable is testing-purpose only.
- return os.getenv("RAY_USAGE_STATS_REPORT_URL", "https://usage-stats.ray.io/")
- def _usage_stats_report_interval_s():
- return int(os.getenv("RAY_USAGE_STATS_REPORT_INTERVAL_S", 3600))
- def _usage_stats_config_path():
- return os.getenv(
- "RAY_USAGE_STATS_CONFIG_PATH", os.path.expanduser("~/.ray/config.json")
- )
- def _usage_stats_enabledness() -> UsageStatsEnabledness:
- # Env var has higher priority than config file.
- usage_stats_enabled_env_var = os.getenv(usage_constant.USAGE_STATS_ENABLED_ENV_VAR)
- if usage_stats_enabled_env_var == "0":
- return UsageStatsEnabledness.DISABLED_EXPLICITLY
- elif usage_stats_enabled_env_var == "1":
- return UsageStatsEnabledness.ENABLED_EXPLICITLY
- elif usage_stats_enabled_env_var is not None:
- raise ValueError(
- f"Valid value for {usage_constant.USAGE_STATS_ENABLED_ENV_VAR} "
- f"env var is 0 or 1, but got {usage_stats_enabled_env_var}"
- )
- usage_stats_enabled_config_var = None
- try:
- with open(_usage_stats_config_path()) as f:
- config = json.load(f)
- usage_stats_enabled_config_var = config.get("usage_stats")
- except FileNotFoundError:
- pass
- except Exception as e:
- logger.debug(f"Failed to load usage stats config {e}")
- if usage_stats_enabled_config_var is False:
- return UsageStatsEnabledness.DISABLED_EXPLICITLY
- elif usage_stats_enabled_config_var is True:
- return UsageStatsEnabledness.ENABLED_EXPLICITLY
- elif usage_stats_enabled_config_var is not None:
- raise ValueError(
- f"Valid value for 'usage_stats' in {_usage_stats_config_path()}"
- f" is true or false, but got {usage_stats_enabled_config_var}"
- )
- # Usage stats is enabled by default.
- return UsageStatsEnabledness.ENABLED_BY_DEFAULT
- def is_nightly_wheel() -> bool:
- return ray.__commit__ != "{{RAY_COMMIT_SHA}}" and "dev" in ray.__version__
- def usage_stats_enabled() -> bool:
- return _usage_stats_enabledness() is not UsageStatsEnabledness.DISABLED_EXPLICITLY
- def usage_stats_prompt_enabled():
- return int(os.getenv("RAY_USAGE_STATS_PROMPT_ENABLED", "1")) == 1
- def _generate_cluster_metadata(*, ray_init_cluster: bool):
- """Return a dictionary of cluster metadata.
- Params:
- ray_init_cluster: Whether the cluster is started by ray.init()
- Returns:
- A dictionary of cluster metadata.
- """
- ray_version, python_version = ray._private.utils.compute_version_info()
- # These two metadata is necessary although usage report is not enabled
- # to check version compatibility.
- metadata = {
- "ray_version": ray_version,
- "python_version": python_version,
- "ray_init_cluster": ray_init_cluster,
- }
- # Additional metadata is recorded only when usage stats are enabled.
- if usage_stats_enabled():
- metadata.update(
- {
- "git_commit": ray.__commit__,
- "os": sys.platform,
- "session_start_timestamp_ms": int(time.time() * 1000),
- }
- )
- if sys.platform == "linux":
- # Record llibc version
- (lib, ver) = platform.libc_ver()
- if not lib:
- metadata.update({"libc_version": "NA"})
- else:
- metadata.update({"libc_version": f"{lib}:{ver}"})
- return metadata
- def show_usage_stats_prompt(cli: bool) -> None:
- if not usage_stats_prompt_enabled():
- return
- from ray.autoscaler._private.cli_logger import cli_logger
- prompt_print = cli_logger.print if cli else print
- usage_stats_enabledness = _usage_stats_enabledness()
- if usage_stats_enabledness is UsageStatsEnabledness.DISABLED_EXPLICITLY:
- prompt_print(usage_constant.USAGE_STATS_DISABLED_MESSAGE)
- elif usage_stats_enabledness is UsageStatsEnabledness.ENABLED_BY_DEFAULT:
- if not cli:
- prompt_print(
- usage_constant.USAGE_STATS_ENABLED_BY_DEFAULT_FOR_RAY_INIT_MESSAGE
- )
- elif cli_logger.interactive:
- enabled = cli_logger.confirm(
- False,
- usage_constant.USAGE_STATS_CONFIRMATION_MESSAGE,
- _default=True,
- _timeout_s=10,
- )
- set_usage_stats_enabled_via_env_var(enabled)
- # Remember user's choice.
- try:
- set_usage_stats_enabled_via_config(enabled)
- except Exception as e:
- logger.debug(
- f"Failed to persist usage stats choice for future clusters: {e}"
- )
- if enabled:
- prompt_print(usage_constant.USAGE_STATS_ENABLED_FOR_CLI_MESSAGE)
- else:
- prompt_print(usage_constant.USAGE_STATS_DISABLED_MESSAGE)
- else:
- prompt_print(
- usage_constant.USAGE_STATS_ENABLED_BY_DEFAULT_FOR_CLI_MESSAGE,
- )
- else:
- assert usage_stats_enabledness is UsageStatsEnabledness.ENABLED_EXPLICITLY
- prompt_print(
- usage_constant.USAGE_STATS_ENABLED_FOR_CLI_MESSAGE
- if cli
- else usage_constant.USAGE_STATS_ENABLED_FOR_RAY_INIT_MESSAGE
- )
- def set_usage_stats_enabled_via_config(enabled) -> None:
- config = {}
- try:
- with open(_usage_stats_config_path()) as f:
- config = json.load(f)
- if not isinstance(config, dict):
- logger.debug(
- f"Invalid ray config file, should be a json dict but got {type(config)}"
- )
- config = {}
- except FileNotFoundError:
- pass
- except Exception as e:
- logger.debug(f"Failed to load ray config file {e}")
- config["usage_stats"] = enabled
- try:
- os.makedirs(os.path.dirname(_usage_stats_config_path()), exist_ok=True)
- with open(_usage_stats_config_path(), "w") as f:
- json.dump(config, f)
- except Exception as e:
- raise Exception(
- "Failed to "
- f'{"enable" if enabled else "disable"}'
- ' usage stats by writing {"usage_stats": '
- f'{"true" if enabled else "false"}'
- "} to "
- f"{_usage_stats_config_path()}"
- ) from e
- def set_usage_stats_enabled_via_env_var(enabled) -> None:
- os.environ[usage_constant.USAGE_STATS_ENABLED_ENV_VAR] = "1" if enabled else "0"
- def put_cluster_metadata(gcs_client: GcsClient, *, ray_init_cluster: bool) -> dict:
- """Generate the cluster metadata and store it to GCS.
- It is a blocking API.
- Params:
- gcs_client: The GCS client to perform KV operation PUT.
- ray_init_cluster: Whether the cluster is started by ray.init()
- Raises:
- gRPC exceptions: If PUT fails.
- Returns:
- The cluster metadata.
- """
- metadata = _generate_cluster_metadata(ray_init_cluster=ray_init_cluster)
- gcs_client.internal_kv_put(
- usage_constant.CLUSTER_METADATA_KEY,
- json.dumps(metadata).encode(),
- overwrite=True,
- namespace=ray_constants.KV_NAMESPACE_CLUSTER,
- )
- return metadata
- def get_total_num_running_jobs_to_report(gcs_client) -> Optional[int]:
- """Return the total number of running jobs in the cluster excluding internal ones"""
- try:
- result = gcs_client.get_all_job_info(
- skip_submission_job_info_field=True, skip_is_running_tasks_field=True
- )
- total_num_running_jobs = 0
- for job_info in result.values():
- if not job_info.is_dead and not job_info.config.ray_namespace.startswith(
- "_ray_internal"
- ):
- total_num_running_jobs += 1
- return total_num_running_jobs
- except Exception as e:
- logger.info(f"Failed to query number of running jobs in the cluster: {e}")
- return None
- def get_total_num_alive_nodes_to_report(gcs_client, timeout=None) -> Optional[int]:
- """Return the total number of alive nodes in the cluster"""
- try:
- result = gcs_client.get_all_node_info(
- timeout=timeout, state_filter=gcs_pb2.GcsNodeInfo.GcsNodeState.ALIVE
- )
- return len(result.items())
- except Exception as e:
- logger.info(f"Failed to query number of nodes in the cluster: {e}")
- return None
- def get_library_usages_to_report(gcs_client) -> List[str]:
- return list(_get_usage_set(gcs_client, usage_constant.LIBRARY_USAGE_SET_NAME))
- def get_hardware_usages_to_report(gcs_client) -> List[str]:
- return list(_get_usage_set(gcs_client, usage_constant.HARDWARE_USAGE_SET_NAME))
- def get_extra_usage_tags_to_report(gcs_client: GcsClient) -> Dict[str, str]:
- """Get the extra usage tags from env var and gcs kv store.
- The env var should be given this way; key=value;key=value.
- If parsing is failed, it will return the empty data.
- Params:
- gcs_client: The GCS client.
- Returns:
- Extra usage tags as kv pairs.
- """
- extra_usage_tags = dict()
- extra_usage_tags_env_var = os.getenv("RAY_USAGE_STATS_EXTRA_TAGS", None)
- if extra_usage_tags_env_var:
- try:
- kvs = extra_usage_tags_env_var.strip(";").split(";")
- for kv in kvs:
- k, v = kv.split("=")
- extra_usage_tags[k] = v
- except Exception as e:
- logger.info(f"Failed to parse extra usage tags env var: {e}")
- valid_tag_keys = [tag_key.lower() for tag_key in TagKey.keys()]
- try:
- keys = gcs_client.internal_kv_keys(
- usage_constant.EXTRA_USAGE_TAG_PREFIX.encode(),
- namespace=usage_constant.USAGE_STATS_NAMESPACE.encode(),
- )
- kv = gcs_client.internal_kv_multi_get(
- keys, namespace=usage_constant.USAGE_STATS_NAMESPACE.encode()
- )
- for key, value in kv.items():
- key = key.decode("utf-8")
- key = key[len(usage_constant.EXTRA_USAGE_TAG_PREFIX) :]
- assert key in valid_tag_keys
- extra_usage_tags[key] = value.decode("utf-8")
- except Exception as e:
- logger.info(f"Failed to get extra usage tags from kv store: {e}")
- return extra_usage_tags
- def _get_cluster_status_to_report_v2(gcs_client: GcsClient) -> ClusterStatusToReport:
- """
- Get the current status of this cluster. A temporary proxy for the
- autoscaler v2 API.
- It is a blocking API.
- Params:
- gcs_client: The GCS client.
- Returns:
- The current cluster status or empty ClusterStatusToReport
- if it fails to get that information.
- """
- from ray.autoscaler.v2.sdk import get_cluster_status
- result = ClusterStatusToReport()
- try:
- cluster_status = get_cluster_status(gcs_client.address)
- total_resources = cluster_status.total_resources()
- result.total_num_cpus = int(total_resources.get("CPU", 0))
- result.total_num_gpus = int(total_resources.get("GPU", 0))
- to_GiB = 1 / 2**30
- result.total_memory_gb = total_resources.get("memory", 0) * to_GiB
- result.total_object_store_memory_gb = (
- total_resources.get("object_store_memory", 0) * to_GiB
- )
- except Exception as e:
- logger.info(f"Failed to get cluster status to report {e}")
- finally:
- return result
- def get_cluster_status_to_report(gcs_client: GcsClient) -> ClusterStatusToReport:
- """Get the current status of this cluster.
- It is a blocking API.
- Params:
- gcs_client: The GCS client to perform KV operation GET.
- Returns:
- The current cluster status or empty if it fails to get that information.
- """
- try:
- from ray.autoscaler.v2.utils import is_autoscaler_v2
- if is_autoscaler_v2():
- return _get_cluster_status_to_report_v2(gcs_client)
- cluster_status = gcs_client.internal_kv_get(
- ray._private.ray_constants.DEBUG_AUTOSCALING_STATUS.encode(),
- namespace=None,
- )
- if not cluster_status:
- return ClusterStatusToReport()
- result = ClusterStatusToReport()
- to_GiB = 1 / 2**30
- cluster_status = json.loads(cluster_status.decode("utf-8"))
- if (
- "load_metrics_report" not in cluster_status
- or "usage" not in cluster_status["load_metrics_report"]
- ):
- return ClusterStatusToReport()
- usage = cluster_status["load_metrics_report"]["usage"]
- # usage is a map from resource to (used, total) pair
- if "CPU" in usage:
- result.total_num_cpus = int(usage["CPU"][1])
- if "GPU" in usage:
- result.total_num_gpus = int(usage["GPU"][1])
- if "memory" in usage:
- result.total_memory_gb = usage["memory"][1] * to_GiB
- if "object_store_memory" in usage:
- result.total_object_store_memory_gb = (
- usage["object_store_memory"][1] * to_GiB
- )
- return result
- except Exception as e:
- logger.info(f"Failed to get cluster status to report {e}")
- return ClusterStatusToReport()
- def get_cloud_from_metadata_requests() -> str:
- def cloud_metadata_request(url: str, headers: Optional[Dict[str, str]]) -> bool:
- try:
- res = requests.get(url, headers=headers, timeout=1)
- # Only accept successful responses (200 OK) to avoid false positives like 400 - Bad Request
- # when multiple cloud providers use the same IP (169.254.169.254)
- if res.status_code == 200:
- return True
- # ConnectionError is a superclass of ConnectTimeout
- except requests.exceptions.ConnectionError:
- pass
- except Exception as e:
- logger.info(
- f"Unexpected exception when making cloud provider metadata request: {e}"
- )
- return False
- AZURE_METADATA_URL = (
- "http://169.254.169.254/metadata/instance?api-version=2021-12-13"
- )
- AZURE_METADATA_HEADERS = {"Metadata": "true"}
- GCP_METADATA_URL = "http://metadata.google.internal/computeMetadata/v1"
- GCP_METADATA_HEADERS = {"Metadata-Flavor": "Google"}
- AWS_METADATA_URL = "http://169.254.169.254/latest/meta-data/"
- AWS_METADATA_HEADERS = None
- if cloud_metadata_request(AZURE_METADATA_URL, AZURE_METADATA_HEADERS):
- return "azure"
- elif cloud_metadata_request(GCP_METADATA_URL, GCP_METADATA_HEADERS):
- return "gcp"
- elif cloud_metadata_request(AWS_METADATA_URL, AWS_METADATA_HEADERS):
- return "aws"
- else:
- return "unknown"
- def get_cluster_config_to_report(
- cluster_config_file_path: str,
- ) -> ClusterConfigToReport:
- """Get the static cluster (autoscaler) config used to launch this cluster.
- Params:
- cluster_config_file_path: The file path to the cluster config file.
- Returns:
- The cluster (autoscaler) config or empty if it fails to get that information.
- """
- def get_instance_type(node_config):
- if not node_config:
- return None
- if "InstanceType" in node_config:
- # aws
- return node_config["InstanceType"]
- if "machineType" in node_config:
- # gcp
- return node_config["machineType"]
- if (
- "azure_arm_parameters" in node_config
- and "vmSize" in node_config["azure_arm_parameters"]
- ):
- return node_config["azure_arm_parameters"]["vmSize"]
- return None
- try:
- with open(cluster_config_file_path) as f:
- config = yaml.safe_load(f)
- result = ClusterConfigToReport()
- if "min_workers" in config:
- result.min_workers = config["min_workers"]
- if "max_workers" in config:
- result.max_workers = config["max_workers"]
- if "provider" in config and "type" in config["provider"]:
- result.cloud_provider = config["provider"]["type"]
- if "head_node_type" not in config:
- return result
- if "available_node_types" not in config:
- return result
- head_node_type = config["head_node_type"]
- available_node_types = config["available_node_types"]
- for available_node_type in available_node_types:
- if available_node_type == head_node_type:
- head_node_instance_type = get_instance_type(
- available_node_types[available_node_type].get("node_config")
- )
- if head_node_instance_type:
- result.head_node_instance_type = head_node_instance_type
- else:
- worker_node_instance_type = get_instance_type(
- available_node_types[available_node_type].get("node_config")
- )
- if worker_node_instance_type:
- result.worker_node_instance_types = (
- result.worker_node_instance_types or set()
- )
- result.worker_node_instance_types.add(worker_node_instance_type)
- if result.worker_node_instance_types:
- result.worker_node_instance_types = list(
- result.worker_node_instance_types
- )
- return result
- except FileNotFoundError:
- # It's a manually started cluster or k8s cluster
- result = ClusterConfigToReport()
- # Check if we're on Kubernetes
- if usage_constant.KUBERNETES_SERVICE_HOST_ENV in os.environ:
- # Check if we're using KubeRay >= 0.4.0.
- if usage_constant.KUBERAY_ENV in os.environ:
- result.cloud_provider = usage_constant.PROVIDER_KUBERAY
- # Else, we're on Kubernetes but not in either of the above categories.
- else:
- result.cloud_provider = usage_constant.PROVIDER_KUBERNETES_GENERIC
- # if kubernetes was not set as cloud_provider vs. was set before
- if result.cloud_provider is None:
- result.cloud_provider = get_cloud_from_metadata_requests()
- else:
- result.cloud_provider += f"_${get_cloud_from_metadata_requests()}"
- return result
- except Exception as e:
- logger.info(f"Failed to get cluster config to report {e}")
- return ClusterConfigToReport()
- def get_cluster_metadata(gcs_client: GcsClient) -> dict:
- """Get the cluster metadata from GCS.
- It is a blocking API.
- This will return None if `put_cluster_metadata` was never called.
- Params:
- gcs_client: The GCS client to perform KV operation GET.
- Returns:
- The cluster metadata in a dictionary.
- Raises:
- RuntimeError: If it fails to obtain cluster metadata from GCS.
- """
- return json.loads(
- gcs_client.internal_kv_get(
- usage_constant.CLUSTER_METADATA_KEY,
- namespace=ray_constants.KV_NAMESPACE_CLUSTER,
- ).decode("utf-8")
- )
- def is_ray_init_cluster(gcs_client: ray._raylet.GcsClient) -> bool:
- """Return whether the cluster is started by ray.init()"""
- cluster_metadata = get_cluster_metadata(gcs_client)
- return cluster_metadata["ray_init_cluster"]
- def generate_disabled_report_data() -> UsageStatsToReport:
- """Generate the report data indicating usage stats is disabled"""
- data = UsageStatsToReport(
- schema_version=usage_constant.SCHEMA_VERSION,
- source=os.getenv(
- usage_constant.USAGE_STATS_SOURCE_ENV_VAR,
- usage_constant.USAGE_STATS_SOURCE_OSS,
- ),
- collect_timestamp_ms=int(time.time() * 1000),
- )
- return data
- def generate_report_data(
- cluster_config_to_report: ClusterConfigToReport,
- total_success: int,
- total_failed: int,
- seq_number: int,
- gcs_address: str,
- cluster_id: str,
- ) -> UsageStatsToReport:
- """Generate the report data.
- Params:
- cluster_config_to_report: The cluster (autoscaler)
- config generated by `get_cluster_config_to_report`.
- total_success: The total number of successful report
- for the lifetime of the cluster.
- total_failed: The total number of failed report
- for the lifetime of the cluster.
- seq_number: The sequence number that's incremented whenever
- a new report is sent.
- gcs_address: the address of gcs to get data to report.
- cluster_id: hex id of the cluster.
- Returns:
- UsageStats
- """
- assert cluster_id
- gcs_client = ray._raylet.GcsClient(address=gcs_address, cluster_id=cluster_id)
- cluster_metadata = get_cluster_metadata(gcs_client)
- cluster_status_to_report = get_cluster_status_to_report(gcs_client)
- data = UsageStatsToReport(
- schema_version=usage_constant.SCHEMA_VERSION,
- source=os.getenv(
- usage_constant.USAGE_STATS_SOURCE_ENV_VAR,
- usage_constant.USAGE_STATS_SOURCE_OSS,
- ),
- collect_timestamp_ms=int(time.time() * 1000),
- total_success=total_success,
- total_failed=total_failed,
- seq_number=seq_number,
- ray_version=cluster_metadata["ray_version"],
- python_version=cluster_metadata["python_version"],
- session_id=cluster_id,
- git_commit=cluster_metadata["git_commit"],
- os=cluster_metadata["os"],
- session_start_timestamp_ms=cluster_metadata["session_start_timestamp_ms"],
- cloud_provider=cluster_config_to_report.cloud_provider,
- min_workers=cluster_config_to_report.min_workers,
- max_workers=cluster_config_to_report.max_workers,
- head_node_instance_type=cluster_config_to_report.head_node_instance_type,
- worker_node_instance_types=cluster_config_to_report.worker_node_instance_types,
- total_num_cpus=cluster_status_to_report.total_num_cpus,
- total_num_gpus=cluster_status_to_report.total_num_gpus,
- total_memory_gb=cluster_status_to_report.total_memory_gb,
- total_object_store_memory_gb=cluster_status_to_report.total_object_store_memory_gb, # noqa: E501
- library_usages=get_library_usages_to_report(gcs_client),
- extra_usage_tags=get_extra_usage_tags_to_report(gcs_client),
- total_num_nodes=get_total_num_alive_nodes_to_report(gcs_client),
- total_num_running_jobs=get_total_num_running_jobs_to_report(gcs_client),
- libc_version=cluster_metadata.get("libc_version"),
- hardware_usages=get_hardware_usages_to_report(gcs_client),
- )
- return data
- def generate_write_data(
- usage_stats: UsageStatsToReport,
- error: str,
- ) -> UsageStatsToWrite:
- """Generate the report data.
- Params:
- usage_stats: The usage stats that were reported.
- error: The error message of failed reports.
- Returns:
- UsageStatsToWrite
- """
- data = UsageStatsToWrite(
- usage_stats=usage_stats,
- success=error is None,
- error=error,
- )
- return data
- class UsageReportClient:
- """The client implementation for usage report.
- It is in charge of writing usage stats to the directory
- and report usage stats.
- """
- def write_usage_data(self, data: UsageStatsToWrite, dir_path: str) -> None:
- """Write the usage data to the directory.
- Params:
- data: Data to report
- dir_path: The path to the directory to write usage data.
- """
- # Atomically update the file.
- dir_path = Path(dir_path)
- destination = dir_path / usage_constant.USAGE_STATS_FILE
- temp = dir_path / f"{usage_constant.USAGE_STATS_FILE}.tmp"
- with temp.open(mode="w") as json_file:
- json_file.write(json.dumps(asdict(data)))
- if sys.platform == "win32":
- # Windows 32 doesn't support atomic renaming, so we should delete
- # the file first.
- destination.unlink(missing_ok=True)
- temp.rename(destination)
- def report_usage_data(self, url: str, data: UsageStatsToReport) -> None:
- """Report the usage data to the usage server.
- Params:
- url: The URL to update resource usage.
- data: Data to report.
- Raises:
- requests.HTTPError: If requests fails.
- """
- r = requests.request(
- "POST",
- url,
- headers={"Content-Type": "application/json"},
- json=asdict(data),
- timeout=10,
- )
- r.raise_for_status()
- return r
|