usage_lib.py 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016
  1. """This is the module that is in charge of Ray usage report (telemetry) APIs.
  2. NOTE: Ray's usage report is currently "on by default".
  3. One could opt-out, see details at https://docs.ray.io/en/master/cluster/usage-stats.html. # noqa
  4. Ray usage report follows the specification from
  5. https://docs.ray.io/en/master/cluster/usage-stats.html#usage-stats-collection # noqa
  6. # Module
  7. The module consists of 2 parts.
  8. ## Public API
  9. It contains public APIs to obtain usage report information.
  10. APIs will be added before the usage report becomes opt-in by default.
  11. ## Internal APIs for usage processing/report
  12. The telemetry report consists of 5 components. This module is in charge of the top 2 layers.
  13. Report -> usage_lib
  14. ---------------------
  15. Usage data processing -> usage_lib
  16. ---------------------
  17. Data storage -> Ray API server
  18. ---------------------
  19. Aggregation -> Ray API server (currently a dashboard server)
  20. ---------------------
  21. Usage data collection -> Various components (Ray agent, GCS, etc.) + usage_lib (cluster metadata).
  22. Usage report is currently "off by default". You can enable the report by setting an environment variable
  23. RAY_USAGE_STATS_ENABLED=1. For example, `RAY_USAGE_STATS_ENABLED=1 ray start --head`.
  24. Or `RAY_USAGE_STATS_ENABLED=1 python [drivers with ray.init()]`.
  25. "Ray API server (currently a dashboard server)" reports the usage data to https://usage-stats.ray.io/.
  26. Data is reported every hour by default.
  27. Note that it is also possible to configure the interval using the environment variable,
  28. `RAY_USAGE_STATS_REPORT_INTERVAL_S`.
  29. To see collected/reported data, see `usage_stats.json` inside a temp
  30. folder (e.g., /tmp/ray/session_[id]/*).
  31. """
  32. import json
  33. import logging
  34. import os
  35. import platform
  36. import sys
  37. import threading
  38. import time
  39. from dataclasses import asdict, dataclass
  40. from enum import Enum, auto
  41. from pathlib import Path
  42. from typing import Dict, List, Optional, Set
  43. import requests
  44. import yaml
  45. import ray
  46. import ray._common.usage.usage_constants as usage_constant
  47. import ray._private.ray_constants as ray_constants
  48. from ray._raylet import GcsClient
  49. from ray.core.generated import gcs_pb2, usage_pb2
  50. from ray.experimental.internal_kv import (
  51. _internal_kv_initialized,
  52. _internal_kv_put,
  53. )
  54. logger = logging.getLogger(__name__)
  55. TagKey = usage_pb2.TagKey
  56. #################
  57. # Internal APIs #
  58. #################
  59. @dataclass(init=True)
  60. class ClusterConfigToReport:
  61. cloud_provider: Optional[str] = None
  62. min_workers: Optional[int] = None
  63. max_workers: Optional[int] = None
  64. head_node_instance_type: Optional[str] = None
  65. worker_node_instance_types: Optional[List[str]] = None
  66. @dataclass(init=True)
  67. class ClusterStatusToReport:
  68. total_num_cpus: Optional[int] = None
  69. total_num_gpus: Optional[int] = None
  70. total_memory_gb: Optional[float] = None
  71. total_object_store_memory_gb: Optional[float] = None
  72. @dataclass(init=True)
  73. class UsageStatsToReport:
  74. """Usage stats to report"""
  75. #: The schema version of the report.
  76. schema_version: str
  77. #: The source of the data (i.e. OSS).
  78. source: str
  79. #: When the data is collected and reported.
  80. collect_timestamp_ms: int
  81. #: The total number of successful reports for the lifetime of the cluster.
  82. total_success: Optional[int] = None
  83. #: The total number of failed reports for the lifetime of the cluster.
  84. total_failed: Optional[int] = None
  85. #: The sequence number of the report.
  86. seq_number: Optional[int] = None
  87. #: The Ray version in use.
  88. ray_version: Optional[str] = None
  89. #: The Python version in use.
  90. python_version: Optional[str] = None
  91. #: A random id of the cluster session.
  92. session_id: Optional[str] = None
  93. #: The git commit hash of Ray (i.e. ray.__commit__).
  94. git_commit: Optional[str] = None
  95. #: The operating system in use.
  96. os: Optional[str] = None
  97. #: When the cluster is started.
  98. session_start_timestamp_ms: Optional[int] = None
  99. #: The cloud provider found in the cluster.yaml file (e.g., aws).
  100. cloud_provider: Optional[str] = None
  101. #: The min_workers found in the cluster.yaml file.
  102. min_workers: Optional[int] = None
  103. #: The max_workers found in the cluster.yaml file.
  104. max_workers: Optional[int] = None
  105. #: The head node instance type found in the cluster.yaml file (e.g., i3.8xlarge).
  106. head_node_instance_type: Optional[str] = None
  107. #: The worker node instance types found in the cluster.yaml file (e.g., i3.8xlarge).
  108. worker_node_instance_types: Optional[List[str]] = None
  109. #: The total num of cpus in the cluster.
  110. total_num_cpus: Optional[int] = None
  111. #: The total num of gpus in the cluster.
  112. total_num_gpus: Optional[int] = None
  113. #: The total size of memory in the cluster.
  114. total_memory_gb: Optional[float] = None
  115. #: The total size of object store memory in the cluster.
  116. total_object_store_memory_gb: Optional[float] = None
  117. #: The Ray libraries that are used (e.g., rllib).
  118. library_usages: Optional[List[str]] = None
  119. #: The extra tags to report when specified by an
  120. # environment variable RAY_USAGE_STATS_EXTRA_TAGS
  121. extra_usage_tags: Optional[Dict[str, str]] = None
  122. #: The number of alive nodes when the report is generated.
  123. total_num_nodes: Optional[int] = None
  124. #: The total number of running jobs excluding internal ones
  125. # when the report is generated.
  126. total_num_running_jobs: Optional[int] = None
  127. #: The libc version in the OS.
  128. libc_version: Optional[str] = None
  129. #: The hardwares that are used (e.g. Intel Xeon).
  130. hardware_usages: Optional[List[str]] = None
  131. @dataclass(init=True)
  132. class UsageStatsToWrite:
  133. """Usage stats to write to `USAGE_STATS_FILE`
  134. We are writing extra metadata such as the status of report
  135. to this file.
  136. """
  137. usage_stats: UsageStatsToReport
  138. # Whether or not the last report succeeded.
  139. success: bool
  140. # The error message of the last report if it happens.
  141. error: str
  142. class UsageStatsEnabledness(Enum):
  143. ENABLED_EXPLICITLY = auto()
  144. DISABLED_EXPLICITLY = auto()
  145. ENABLED_BY_DEFAULT = auto()
  146. _recorded_library_usages = set()
  147. _recorded_library_usages_lock = threading.Lock()
  148. _recorded_extra_usage_tags = dict()
  149. _recorded_extra_usage_tags_lock = threading.Lock()
  150. def _add_to_usage_set(set_name: str, value: str):
  151. assert _internal_kv_initialized()
  152. try:
  153. _internal_kv_put(
  154. f"{set_name}{value}".encode(),
  155. b"",
  156. namespace=usage_constant.USAGE_STATS_NAMESPACE.encode(),
  157. )
  158. except Exception as e:
  159. logger.debug(f"Failed to add {value} to usage set {set_name}, {e}")
  160. def _get_usage_set(gcs_client, set_name: str) -> Set[str]:
  161. try:
  162. result = set()
  163. usages = gcs_client.internal_kv_keys(
  164. set_name.encode(),
  165. namespace=usage_constant.USAGE_STATS_NAMESPACE.encode(),
  166. )
  167. for usage in usages:
  168. usage = usage.decode("utf-8")
  169. result.add(usage[len(set_name) :])
  170. return result
  171. except Exception as e:
  172. logger.debug(f"Failed to get usage set {set_name}, {e}")
  173. return set()
  174. def _put_library_usage(library_usage: str):
  175. _add_to_usage_set(usage_constant.LIBRARY_USAGE_SET_NAME, library_usage)
  176. def _put_hardware_usage(hardware_usage: str):
  177. _add_to_usage_set(usage_constant.HARDWARE_USAGE_SET_NAME, hardware_usage)
  178. def record_extra_usage_tag(
  179. key: TagKey, value: str, gcs_client: Optional[GcsClient] = None
  180. ):
  181. """Record extra kv usage tag.
  182. If the key already exists, the value will be overwritten.
  183. To record an extra tag, first add the key to the TagKey enum and
  184. then call this function.
  185. It will make a synchronous call to the internal kv store if the tag is updated.
  186. Args:
  187. key: The key of the tag.
  188. value: The value of the tag.
  189. gcs_client: The GCS client to perform KV operation PUT. Defaults to None.
  190. When None, it will try to get the global client from the internal_kv.
  191. Returns:
  192. None
  193. """
  194. key = TagKey.Name(key).lower()
  195. with _recorded_extra_usage_tags_lock:
  196. if _recorded_extra_usage_tags.get(key) == value:
  197. return
  198. _recorded_extra_usage_tags[key] = value
  199. if not _internal_kv_initialized() and gcs_client is None:
  200. # This happens if the record is before ray.init and
  201. # no GCS client is used for recording explicitly.
  202. return
  203. _put_extra_usage_tag(key, value, gcs_client)
  204. def _put_extra_usage_tag(key: str, value: str, gcs_client: Optional[GcsClient] = None):
  205. try:
  206. key = f"{usage_constant.EXTRA_USAGE_TAG_PREFIX}{key}".encode()
  207. val = value.encode()
  208. namespace = usage_constant.USAGE_STATS_NAMESPACE.encode()
  209. if gcs_client is not None:
  210. # Use the GCS client.
  211. gcs_client.internal_kv_put(key, val, namespace=namespace)
  212. else:
  213. # Use internal kv.
  214. assert _internal_kv_initialized()
  215. _internal_kv_put(key, val, namespace=namespace)
  216. except Exception as e:
  217. logger.debug(f"Failed to put extra usage tag, {e}")
  218. def record_hardware_usage(hardware_usage: str):
  219. """Record hardware usage (e.g. which CPU model is used)"""
  220. assert _internal_kv_initialized()
  221. _put_hardware_usage(hardware_usage)
  222. def record_library_usage(library_usage: str):
  223. """Record library usage (e.g. which library is used)"""
  224. with _recorded_library_usages_lock:
  225. if library_usage in _recorded_library_usages:
  226. return
  227. _recorded_library_usages.add(library_usage)
  228. if not _internal_kv_initialized():
  229. # This happens if the library is imported before ray.init
  230. return
  231. # Only report lib usage for driver / ray client / workers. Otherwise,
  232. # it can be reported if the library is imported from
  233. # e.g., API server.
  234. if (
  235. ray._private.worker.global_worker.mode == ray.SCRIPT_MODE
  236. or ray._private.worker.global_worker.mode == ray.WORKER_MODE
  237. or ray.util.client.ray.is_connected()
  238. ):
  239. _put_library_usage(library_usage)
  240. def _put_pre_init_library_usages():
  241. assert _internal_kv_initialized()
  242. # NOTE: When the lib is imported from a worker, ray should
  243. # always be initialized, so there's no need to register the
  244. # pre init hook.
  245. if not (
  246. ray._private.worker.global_worker.mode == ray.SCRIPT_MODE
  247. or ray.util.client.ray.is_connected()
  248. ):
  249. return
  250. for library_usage in _recorded_library_usages:
  251. _put_library_usage(library_usage)
  252. def _put_pre_init_extra_usage_tags():
  253. assert _internal_kv_initialized()
  254. for k, v in _recorded_extra_usage_tags.items():
  255. _put_extra_usage_tag(k, v)
  256. def put_pre_init_usage_stats():
  257. _put_pre_init_library_usages()
  258. _put_pre_init_extra_usage_tags()
  259. def reset_global_state():
  260. global _recorded_library_usages, _recorded_extra_usage_tags
  261. with _recorded_library_usages_lock:
  262. _recorded_library_usages = set()
  263. with _recorded_extra_usage_tags_lock:
  264. _recorded_extra_usage_tags = dict()
  265. ray._private.worker._post_init_hooks.append(put_pre_init_usage_stats)
  266. def _usage_stats_report_url():
  267. # The usage collection server URL.
  268. # The environment variable is testing-purpose only.
  269. return os.getenv("RAY_USAGE_STATS_REPORT_URL", "https://usage-stats.ray.io/")
  270. def _usage_stats_report_interval_s():
  271. return int(os.getenv("RAY_USAGE_STATS_REPORT_INTERVAL_S", 3600))
  272. def _usage_stats_config_path():
  273. return os.getenv(
  274. "RAY_USAGE_STATS_CONFIG_PATH", os.path.expanduser("~/.ray/config.json")
  275. )
  276. def _usage_stats_enabledness() -> UsageStatsEnabledness:
  277. # Env var has higher priority than config file.
  278. usage_stats_enabled_env_var = os.getenv(usage_constant.USAGE_STATS_ENABLED_ENV_VAR)
  279. if usage_stats_enabled_env_var == "0":
  280. return UsageStatsEnabledness.DISABLED_EXPLICITLY
  281. elif usage_stats_enabled_env_var == "1":
  282. return UsageStatsEnabledness.ENABLED_EXPLICITLY
  283. elif usage_stats_enabled_env_var is not None:
  284. raise ValueError(
  285. f"Valid value for {usage_constant.USAGE_STATS_ENABLED_ENV_VAR} "
  286. f"env var is 0 or 1, but got {usage_stats_enabled_env_var}"
  287. )
  288. usage_stats_enabled_config_var = None
  289. try:
  290. with open(_usage_stats_config_path()) as f:
  291. config = json.load(f)
  292. usage_stats_enabled_config_var = config.get("usage_stats")
  293. except FileNotFoundError:
  294. pass
  295. except Exception as e:
  296. logger.debug(f"Failed to load usage stats config {e}")
  297. if usage_stats_enabled_config_var is False:
  298. return UsageStatsEnabledness.DISABLED_EXPLICITLY
  299. elif usage_stats_enabled_config_var is True:
  300. return UsageStatsEnabledness.ENABLED_EXPLICITLY
  301. elif usage_stats_enabled_config_var is not None:
  302. raise ValueError(
  303. f"Valid value for 'usage_stats' in {_usage_stats_config_path()}"
  304. f" is true or false, but got {usage_stats_enabled_config_var}"
  305. )
  306. # Usage stats is enabled by default.
  307. return UsageStatsEnabledness.ENABLED_BY_DEFAULT
  308. def is_nightly_wheel() -> bool:
  309. return ray.__commit__ != "{{RAY_COMMIT_SHA}}" and "dev" in ray.__version__
  310. def usage_stats_enabled() -> bool:
  311. return _usage_stats_enabledness() is not UsageStatsEnabledness.DISABLED_EXPLICITLY
  312. def usage_stats_prompt_enabled():
  313. return int(os.getenv("RAY_USAGE_STATS_PROMPT_ENABLED", "1")) == 1
  314. def _generate_cluster_metadata(*, ray_init_cluster: bool):
  315. """Return a dictionary of cluster metadata.
  316. Params:
  317. ray_init_cluster: Whether the cluster is started by ray.init()
  318. Returns:
  319. A dictionary of cluster metadata.
  320. """
  321. ray_version, python_version = ray._private.utils.compute_version_info()
  322. # These two metadata is necessary although usage report is not enabled
  323. # to check version compatibility.
  324. metadata = {
  325. "ray_version": ray_version,
  326. "python_version": python_version,
  327. "ray_init_cluster": ray_init_cluster,
  328. }
  329. # Additional metadata is recorded only when usage stats are enabled.
  330. if usage_stats_enabled():
  331. metadata.update(
  332. {
  333. "git_commit": ray.__commit__,
  334. "os": sys.platform,
  335. "session_start_timestamp_ms": int(time.time() * 1000),
  336. }
  337. )
  338. if sys.platform == "linux":
  339. # Record llibc version
  340. (lib, ver) = platform.libc_ver()
  341. if not lib:
  342. metadata.update({"libc_version": "NA"})
  343. else:
  344. metadata.update({"libc_version": f"{lib}:{ver}"})
  345. return metadata
  346. def show_usage_stats_prompt(cli: bool) -> None:
  347. if not usage_stats_prompt_enabled():
  348. return
  349. from ray.autoscaler._private.cli_logger import cli_logger
  350. prompt_print = cli_logger.print if cli else print
  351. usage_stats_enabledness = _usage_stats_enabledness()
  352. if usage_stats_enabledness is UsageStatsEnabledness.DISABLED_EXPLICITLY:
  353. prompt_print(usage_constant.USAGE_STATS_DISABLED_MESSAGE)
  354. elif usage_stats_enabledness is UsageStatsEnabledness.ENABLED_BY_DEFAULT:
  355. if not cli:
  356. prompt_print(
  357. usage_constant.USAGE_STATS_ENABLED_BY_DEFAULT_FOR_RAY_INIT_MESSAGE
  358. )
  359. elif cli_logger.interactive:
  360. enabled = cli_logger.confirm(
  361. False,
  362. usage_constant.USAGE_STATS_CONFIRMATION_MESSAGE,
  363. _default=True,
  364. _timeout_s=10,
  365. )
  366. set_usage_stats_enabled_via_env_var(enabled)
  367. # Remember user's choice.
  368. try:
  369. set_usage_stats_enabled_via_config(enabled)
  370. except Exception as e:
  371. logger.debug(
  372. f"Failed to persist usage stats choice for future clusters: {e}"
  373. )
  374. if enabled:
  375. prompt_print(usage_constant.USAGE_STATS_ENABLED_FOR_CLI_MESSAGE)
  376. else:
  377. prompt_print(usage_constant.USAGE_STATS_DISABLED_MESSAGE)
  378. else:
  379. prompt_print(
  380. usage_constant.USAGE_STATS_ENABLED_BY_DEFAULT_FOR_CLI_MESSAGE,
  381. )
  382. else:
  383. assert usage_stats_enabledness is UsageStatsEnabledness.ENABLED_EXPLICITLY
  384. prompt_print(
  385. usage_constant.USAGE_STATS_ENABLED_FOR_CLI_MESSAGE
  386. if cli
  387. else usage_constant.USAGE_STATS_ENABLED_FOR_RAY_INIT_MESSAGE
  388. )
  389. def set_usage_stats_enabled_via_config(enabled) -> None:
  390. config = {}
  391. try:
  392. with open(_usage_stats_config_path()) as f:
  393. config = json.load(f)
  394. if not isinstance(config, dict):
  395. logger.debug(
  396. f"Invalid ray config file, should be a json dict but got {type(config)}"
  397. )
  398. config = {}
  399. except FileNotFoundError:
  400. pass
  401. except Exception as e:
  402. logger.debug(f"Failed to load ray config file {e}")
  403. config["usage_stats"] = enabled
  404. try:
  405. os.makedirs(os.path.dirname(_usage_stats_config_path()), exist_ok=True)
  406. with open(_usage_stats_config_path(), "w") as f:
  407. json.dump(config, f)
  408. except Exception as e:
  409. raise Exception(
  410. "Failed to "
  411. f'{"enable" if enabled else "disable"}'
  412. ' usage stats by writing {"usage_stats": '
  413. f'{"true" if enabled else "false"}'
  414. "} to "
  415. f"{_usage_stats_config_path()}"
  416. ) from e
  417. def set_usage_stats_enabled_via_env_var(enabled) -> None:
  418. os.environ[usage_constant.USAGE_STATS_ENABLED_ENV_VAR] = "1" if enabled else "0"
  419. def put_cluster_metadata(gcs_client: GcsClient, *, ray_init_cluster: bool) -> dict:
  420. """Generate the cluster metadata and store it to GCS.
  421. It is a blocking API.
  422. Params:
  423. gcs_client: The GCS client to perform KV operation PUT.
  424. ray_init_cluster: Whether the cluster is started by ray.init()
  425. Raises:
  426. gRPC exceptions: If PUT fails.
  427. Returns:
  428. The cluster metadata.
  429. """
  430. metadata = _generate_cluster_metadata(ray_init_cluster=ray_init_cluster)
  431. gcs_client.internal_kv_put(
  432. usage_constant.CLUSTER_METADATA_KEY,
  433. json.dumps(metadata).encode(),
  434. overwrite=True,
  435. namespace=ray_constants.KV_NAMESPACE_CLUSTER,
  436. )
  437. return metadata
  438. def get_total_num_running_jobs_to_report(gcs_client) -> Optional[int]:
  439. """Return the total number of running jobs in the cluster excluding internal ones"""
  440. try:
  441. result = gcs_client.get_all_job_info(
  442. skip_submission_job_info_field=True, skip_is_running_tasks_field=True
  443. )
  444. total_num_running_jobs = 0
  445. for job_info in result.values():
  446. if not job_info.is_dead and not job_info.config.ray_namespace.startswith(
  447. "_ray_internal"
  448. ):
  449. total_num_running_jobs += 1
  450. return total_num_running_jobs
  451. except Exception as e:
  452. logger.info(f"Failed to query number of running jobs in the cluster: {e}")
  453. return None
  454. def get_total_num_alive_nodes_to_report(gcs_client, timeout=None) -> Optional[int]:
  455. """Return the total number of alive nodes in the cluster"""
  456. try:
  457. result = gcs_client.get_all_node_info(
  458. timeout=timeout, state_filter=gcs_pb2.GcsNodeInfo.GcsNodeState.ALIVE
  459. )
  460. return len(result.items())
  461. except Exception as e:
  462. logger.info(f"Failed to query number of nodes in the cluster: {e}")
  463. return None
  464. def get_library_usages_to_report(gcs_client) -> List[str]:
  465. return list(_get_usage_set(gcs_client, usage_constant.LIBRARY_USAGE_SET_NAME))
  466. def get_hardware_usages_to_report(gcs_client) -> List[str]:
  467. return list(_get_usage_set(gcs_client, usage_constant.HARDWARE_USAGE_SET_NAME))
  468. def get_extra_usage_tags_to_report(gcs_client: GcsClient) -> Dict[str, str]:
  469. """Get the extra usage tags from env var and gcs kv store.
  470. The env var should be given this way; key=value;key=value.
  471. If parsing is failed, it will return the empty data.
  472. Params:
  473. gcs_client: The GCS client.
  474. Returns:
  475. Extra usage tags as kv pairs.
  476. """
  477. extra_usage_tags = dict()
  478. extra_usage_tags_env_var = os.getenv("RAY_USAGE_STATS_EXTRA_TAGS", None)
  479. if extra_usage_tags_env_var:
  480. try:
  481. kvs = extra_usage_tags_env_var.strip(";").split(";")
  482. for kv in kvs:
  483. k, v = kv.split("=")
  484. extra_usage_tags[k] = v
  485. except Exception as e:
  486. logger.info(f"Failed to parse extra usage tags env var: {e}")
  487. valid_tag_keys = [tag_key.lower() for tag_key in TagKey.keys()]
  488. try:
  489. keys = gcs_client.internal_kv_keys(
  490. usage_constant.EXTRA_USAGE_TAG_PREFIX.encode(),
  491. namespace=usage_constant.USAGE_STATS_NAMESPACE.encode(),
  492. )
  493. kv = gcs_client.internal_kv_multi_get(
  494. keys, namespace=usage_constant.USAGE_STATS_NAMESPACE.encode()
  495. )
  496. for key, value in kv.items():
  497. key = key.decode("utf-8")
  498. key = key[len(usage_constant.EXTRA_USAGE_TAG_PREFIX) :]
  499. assert key in valid_tag_keys
  500. extra_usage_tags[key] = value.decode("utf-8")
  501. except Exception as e:
  502. logger.info(f"Failed to get extra usage tags from kv store: {e}")
  503. return extra_usage_tags
  504. def _get_cluster_status_to_report_v2(gcs_client: GcsClient) -> ClusterStatusToReport:
  505. """
  506. Get the current status of this cluster. A temporary proxy for the
  507. autoscaler v2 API.
  508. It is a blocking API.
  509. Params:
  510. gcs_client: The GCS client.
  511. Returns:
  512. The current cluster status or empty ClusterStatusToReport
  513. if it fails to get that information.
  514. """
  515. from ray.autoscaler.v2.sdk import get_cluster_status
  516. result = ClusterStatusToReport()
  517. try:
  518. cluster_status = get_cluster_status(gcs_client.address)
  519. total_resources = cluster_status.total_resources()
  520. result.total_num_cpus = int(total_resources.get("CPU", 0))
  521. result.total_num_gpus = int(total_resources.get("GPU", 0))
  522. to_GiB = 1 / 2**30
  523. result.total_memory_gb = total_resources.get("memory", 0) * to_GiB
  524. result.total_object_store_memory_gb = (
  525. total_resources.get("object_store_memory", 0) * to_GiB
  526. )
  527. except Exception as e:
  528. logger.info(f"Failed to get cluster status to report {e}")
  529. finally:
  530. return result
  531. def get_cluster_status_to_report(gcs_client: GcsClient) -> ClusterStatusToReport:
  532. """Get the current status of this cluster.
  533. It is a blocking API.
  534. Params:
  535. gcs_client: The GCS client to perform KV operation GET.
  536. Returns:
  537. The current cluster status or empty if it fails to get that information.
  538. """
  539. try:
  540. from ray.autoscaler.v2.utils import is_autoscaler_v2
  541. if is_autoscaler_v2():
  542. return _get_cluster_status_to_report_v2(gcs_client)
  543. cluster_status = gcs_client.internal_kv_get(
  544. ray._private.ray_constants.DEBUG_AUTOSCALING_STATUS.encode(),
  545. namespace=None,
  546. )
  547. if not cluster_status:
  548. return ClusterStatusToReport()
  549. result = ClusterStatusToReport()
  550. to_GiB = 1 / 2**30
  551. cluster_status = json.loads(cluster_status.decode("utf-8"))
  552. if (
  553. "load_metrics_report" not in cluster_status
  554. or "usage" not in cluster_status["load_metrics_report"]
  555. ):
  556. return ClusterStatusToReport()
  557. usage = cluster_status["load_metrics_report"]["usage"]
  558. # usage is a map from resource to (used, total) pair
  559. if "CPU" in usage:
  560. result.total_num_cpus = int(usage["CPU"][1])
  561. if "GPU" in usage:
  562. result.total_num_gpus = int(usage["GPU"][1])
  563. if "memory" in usage:
  564. result.total_memory_gb = usage["memory"][1] * to_GiB
  565. if "object_store_memory" in usage:
  566. result.total_object_store_memory_gb = (
  567. usage["object_store_memory"][1] * to_GiB
  568. )
  569. return result
  570. except Exception as e:
  571. logger.info(f"Failed to get cluster status to report {e}")
  572. return ClusterStatusToReport()
  573. def get_cloud_from_metadata_requests() -> str:
  574. def cloud_metadata_request(url: str, headers: Optional[Dict[str, str]]) -> bool:
  575. try:
  576. res = requests.get(url, headers=headers, timeout=1)
  577. # Only accept successful responses (200 OK) to avoid false positives like 400 - Bad Request
  578. # when multiple cloud providers use the same IP (169.254.169.254)
  579. if res.status_code == 200:
  580. return True
  581. # ConnectionError is a superclass of ConnectTimeout
  582. except requests.exceptions.ConnectionError:
  583. pass
  584. except Exception as e:
  585. logger.info(
  586. f"Unexpected exception when making cloud provider metadata request: {e}"
  587. )
  588. return False
  589. AZURE_METADATA_URL = (
  590. "http://169.254.169.254/metadata/instance?api-version=2021-12-13"
  591. )
  592. AZURE_METADATA_HEADERS = {"Metadata": "true"}
  593. GCP_METADATA_URL = "http://metadata.google.internal/computeMetadata/v1"
  594. GCP_METADATA_HEADERS = {"Metadata-Flavor": "Google"}
  595. AWS_METADATA_URL = "http://169.254.169.254/latest/meta-data/"
  596. AWS_METADATA_HEADERS = None
  597. if cloud_metadata_request(AZURE_METADATA_URL, AZURE_METADATA_HEADERS):
  598. return "azure"
  599. elif cloud_metadata_request(GCP_METADATA_URL, GCP_METADATA_HEADERS):
  600. return "gcp"
  601. elif cloud_metadata_request(AWS_METADATA_URL, AWS_METADATA_HEADERS):
  602. return "aws"
  603. else:
  604. return "unknown"
  605. def get_cluster_config_to_report(
  606. cluster_config_file_path: str,
  607. ) -> ClusterConfigToReport:
  608. """Get the static cluster (autoscaler) config used to launch this cluster.
  609. Params:
  610. cluster_config_file_path: The file path to the cluster config file.
  611. Returns:
  612. The cluster (autoscaler) config or empty if it fails to get that information.
  613. """
  614. def get_instance_type(node_config):
  615. if not node_config:
  616. return None
  617. if "InstanceType" in node_config:
  618. # aws
  619. return node_config["InstanceType"]
  620. if "machineType" in node_config:
  621. # gcp
  622. return node_config["machineType"]
  623. if (
  624. "azure_arm_parameters" in node_config
  625. and "vmSize" in node_config["azure_arm_parameters"]
  626. ):
  627. return node_config["azure_arm_parameters"]["vmSize"]
  628. return None
  629. try:
  630. with open(cluster_config_file_path) as f:
  631. config = yaml.safe_load(f)
  632. result = ClusterConfigToReport()
  633. if "min_workers" in config:
  634. result.min_workers = config["min_workers"]
  635. if "max_workers" in config:
  636. result.max_workers = config["max_workers"]
  637. if "provider" in config and "type" in config["provider"]:
  638. result.cloud_provider = config["provider"]["type"]
  639. if "head_node_type" not in config:
  640. return result
  641. if "available_node_types" not in config:
  642. return result
  643. head_node_type = config["head_node_type"]
  644. available_node_types = config["available_node_types"]
  645. for available_node_type in available_node_types:
  646. if available_node_type == head_node_type:
  647. head_node_instance_type = get_instance_type(
  648. available_node_types[available_node_type].get("node_config")
  649. )
  650. if head_node_instance_type:
  651. result.head_node_instance_type = head_node_instance_type
  652. else:
  653. worker_node_instance_type = get_instance_type(
  654. available_node_types[available_node_type].get("node_config")
  655. )
  656. if worker_node_instance_type:
  657. result.worker_node_instance_types = (
  658. result.worker_node_instance_types or set()
  659. )
  660. result.worker_node_instance_types.add(worker_node_instance_type)
  661. if result.worker_node_instance_types:
  662. result.worker_node_instance_types = list(
  663. result.worker_node_instance_types
  664. )
  665. return result
  666. except FileNotFoundError:
  667. # It's a manually started cluster or k8s cluster
  668. result = ClusterConfigToReport()
  669. # Check if we're on Kubernetes
  670. if usage_constant.KUBERNETES_SERVICE_HOST_ENV in os.environ:
  671. # Check if we're using KubeRay >= 0.4.0.
  672. if usage_constant.KUBERAY_ENV in os.environ:
  673. result.cloud_provider = usage_constant.PROVIDER_KUBERAY
  674. # Else, we're on Kubernetes but not in either of the above categories.
  675. else:
  676. result.cloud_provider = usage_constant.PROVIDER_KUBERNETES_GENERIC
  677. # if kubernetes was not set as cloud_provider vs. was set before
  678. if result.cloud_provider is None:
  679. result.cloud_provider = get_cloud_from_metadata_requests()
  680. else:
  681. result.cloud_provider += f"_${get_cloud_from_metadata_requests()}"
  682. return result
  683. except Exception as e:
  684. logger.info(f"Failed to get cluster config to report {e}")
  685. return ClusterConfigToReport()
  686. def get_cluster_metadata(gcs_client: GcsClient) -> dict:
  687. """Get the cluster metadata from GCS.
  688. It is a blocking API.
  689. This will return None if `put_cluster_metadata` was never called.
  690. Params:
  691. gcs_client: The GCS client to perform KV operation GET.
  692. Returns:
  693. The cluster metadata in a dictionary.
  694. Raises:
  695. RuntimeError: If it fails to obtain cluster metadata from GCS.
  696. """
  697. return json.loads(
  698. gcs_client.internal_kv_get(
  699. usage_constant.CLUSTER_METADATA_KEY,
  700. namespace=ray_constants.KV_NAMESPACE_CLUSTER,
  701. ).decode("utf-8")
  702. )
  703. def is_ray_init_cluster(gcs_client: ray._raylet.GcsClient) -> bool:
  704. """Return whether the cluster is started by ray.init()"""
  705. cluster_metadata = get_cluster_metadata(gcs_client)
  706. return cluster_metadata["ray_init_cluster"]
  707. def generate_disabled_report_data() -> UsageStatsToReport:
  708. """Generate the report data indicating usage stats is disabled"""
  709. data = UsageStatsToReport(
  710. schema_version=usage_constant.SCHEMA_VERSION,
  711. source=os.getenv(
  712. usage_constant.USAGE_STATS_SOURCE_ENV_VAR,
  713. usage_constant.USAGE_STATS_SOURCE_OSS,
  714. ),
  715. collect_timestamp_ms=int(time.time() * 1000),
  716. )
  717. return data
  718. def generate_report_data(
  719. cluster_config_to_report: ClusterConfigToReport,
  720. total_success: int,
  721. total_failed: int,
  722. seq_number: int,
  723. gcs_address: str,
  724. cluster_id: str,
  725. ) -> UsageStatsToReport:
  726. """Generate the report data.
  727. Params:
  728. cluster_config_to_report: The cluster (autoscaler)
  729. config generated by `get_cluster_config_to_report`.
  730. total_success: The total number of successful report
  731. for the lifetime of the cluster.
  732. total_failed: The total number of failed report
  733. for the lifetime of the cluster.
  734. seq_number: The sequence number that's incremented whenever
  735. a new report is sent.
  736. gcs_address: the address of gcs to get data to report.
  737. cluster_id: hex id of the cluster.
  738. Returns:
  739. UsageStats
  740. """
  741. assert cluster_id
  742. gcs_client = ray._raylet.GcsClient(address=gcs_address, cluster_id=cluster_id)
  743. cluster_metadata = get_cluster_metadata(gcs_client)
  744. cluster_status_to_report = get_cluster_status_to_report(gcs_client)
  745. data = UsageStatsToReport(
  746. schema_version=usage_constant.SCHEMA_VERSION,
  747. source=os.getenv(
  748. usage_constant.USAGE_STATS_SOURCE_ENV_VAR,
  749. usage_constant.USAGE_STATS_SOURCE_OSS,
  750. ),
  751. collect_timestamp_ms=int(time.time() * 1000),
  752. total_success=total_success,
  753. total_failed=total_failed,
  754. seq_number=seq_number,
  755. ray_version=cluster_metadata["ray_version"],
  756. python_version=cluster_metadata["python_version"],
  757. session_id=cluster_id,
  758. git_commit=cluster_metadata["git_commit"],
  759. os=cluster_metadata["os"],
  760. session_start_timestamp_ms=cluster_metadata["session_start_timestamp_ms"],
  761. cloud_provider=cluster_config_to_report.cloud_provider,
  762. min_workers=cluster_config_to_report.min_workers,
  763. max_workers=cluster_config_to_report.max_workers,
  764. head_node_instance_type=cluster_config_to_report.head_node_instance_type,
  765. worker_node_instance_types=cluster_config_to_report.worker_node_instance_types,
  766. total_num_cpus=cluster_status_to_report.total_num_cpus,
  767. total_num_gpus=cluster_status_to_report.total_num_gpus,
  768. total_memory_gb=cluster_status_to_report.total_memory_gb,
  769. total_object_store_memory_gb=cluster_status_to_report.total_object_store_memory_gb, # noqa: E501
  770. library_usages=get_library_usages_to_report(gcs_client),
  771. extra_usage_tags=get_extra_usage_tags_to_report(gcs_client),
  772. total_num_nodes=get_total_num_alive_nodes_to_report(gcs_client),
  773. total_num_running_jobs=get_total_num_running_jobs_to_report(gcs_client),
  774. libc_version=cluster_metadata.get("libc_version"),
  775. hardware_usages=get_hardware_usages_to_report(gcs_client),
  776. )
  777. return data
  778. def generate_write_data(
  779. usage_stats: UsageStatsToReport,
  780. error: str,
  781. ) -> UsageStatsToWrite:
  782. """Generate the report data.
  783. Params:
  784. usage_stats: The usage stats that were reported.
  785. error: The error message of failed reports.
  786. Returns:
  787. UsageStatsToWrite
  788. """
  789. data = UsageStatsToWrite(
  790. usage_stats=usage_stats,
  791. success=error is None,
  792. error=error,
  793. )
  794. return data
  795. class UsageReportClient:
  796. """The client implementation for usage report.
  797. It is in charge of writing usage stats to the directory
  798. and report usage stats.
  799. """
  800. def write_usage_data(self, data: UsageStatsToWrite, dir_path: str) -> None:
  801. """Write the usage data to the directory.
  802. Params:
  803. data: Data to report
  804. dir_path: The path to the directory to write usage data.
  805. """
  806. # Atomically update the file.
  807. dir_path = Path(dir_path)
  808. destination = dir_path / usage_constant.USAGE_STATS_FILE
  809. temp = dir_path / f"{usage_constant.USAGE_STATS_FILE}.tmp"
  810. with temp.open(mode="w") as json_file:
  811. json_file.write(json.dumps(asdict(data)))
  812. if sys.platform == "win32":
  813. # Windows 32 doesn't support atomic renaming, so we should delete
  814. # the file first.
  815. destination.unlink(missing_ok=True)
  816. temp.rename(destination)
  817. def report_usage_data(self, url: str, data: UsageStatsToReport) -> None:
  818. """Report the usage data to the usage server.
  819. Params:
  820. url: The URL to update resource usage.
  821. data: Data to report.
  822. Raises:
  823. requests.HTTPError: If requests fails.
  824. """
  825. r = requests.request(
  826. "POST",
  827. url,
  828. headers={"Content-Type": "application/json"},
  829. json=asdict(data),
  830. timeout=10,
  831. )
  832. r.raise_for_status()
  833. return r