autoscaler.py 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572
  1. import copy
  2. import logging
  3. import math
  4. import operator
  5. import os
  6. import queue
  7. import subprocess
  8. import threading
  9. import time
  10. from collections import Counter, defaultdict, namedtuple
  11. from dataclasses import dataclass, field
  12. from enum import Enum
  13. from typing import Any, Callable, Dict, FrozenSet, List, Optional, Set, Tuple, Union
  14. import yaml
  15. import ray
  16. from ray._common.utils import PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME
  17. from ray.autoscaler._private.constants import (
  18. AUTOSCALER_HEARTBEAT_TIMEOUT_S,
  19. AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
  20. AUTOSCALER_MAX_LAUNCH_BATCH,
  21. AUTOSCALER_MAX_NUM_FAILURES,
  22. AUTOSCALER_STATUS_LOG,
  23. AUTOSCALER_UPDATE_INTERVAL_S,
  24. DISABLE_LAUNCH_CONFIG_CHECK_KEY,
  25. DISABLE_NODE_UPDATERS_KEY,
  26. FOREGROUND_NODE_LAUNCH_KEY,
  27. WORKER_LIVENESS_CHECK_KEY,
  28. )
  29. from ray.autoscaler._private.event_summarizer import EventSummarizer
  30. from ray.autoscaler._private.legacy_info_string import legacy_log_info_string
  31. from ray.autoscaler._private.load_metrics import LoadMetrics
  32. from ray.autoscaler._private.local.node_provider import (
  33. LocalNodeProvider,
  34. record_local_head_state_if_needed,
  35. )
  36. from ray.autoscaler._private.node_launcher import BaseNodeLauncher, NodeLauncher
  37. from ray.autoscaler._private.node_provider_availability_tracker import (
  38. NodeAvailabilitySummary,
  39. NodeProviderAvailabilityTracker,
  40. )
  41. from ray.autoscaler._private.node_tracker import NodeTracker
  42. from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
  43. from ray.autoscaler._private.providers import _get_node_provider
  44. from ray.autoscaler._private.resource_demand_scheduler import (
  45. ResourceDemandScheduler,
  46. ResourceDict,
  47. get_bin_pack_residual,
  48. placement_groups_to_resource_demands,
  49. )
  50. from ray.autoscaler._private.updater import NodeUpdaterThread
  51. from ray.autoscaler._private.util import (
  52. ConcurrentCounter,
  53. NodeCount,
  54. NodeID,
  55. NodeIP,
  56. NodeType,
  57. NodeTypeConfigDict,
  58. format_info_string,
  59. hash_launch_conf,
  60. hash_runtime_conf,
  61. validate_config,
  62. with_head_node_ip,
  63. )
  64. from ray.autoscaler.node_provider import NodeProvider
  65. from ray.autoscaler.tags import (
  66. NODE_KIND_HEAD,
  67. NODE_KIND_UNMANAGED,
  68. NODE_KIND_WORKER,
  69. STATUS_UP_TO_DATE,
  70. STATUS_UPDATE_FAILED,
  71. TAG_RAY_FILE_MOUNTS_CONTENTS,
  72. TAG_RAY_LAUNCH_CONFIG,
  73. TAG_RAY_NODE_KIND,
  74. TAG_RAY_NODE_STATUS,
  75. TAG_RAY_RUNTIME_CONFIG,
  76. TAG_RAY_USER_NODE_TYPE,
  77. )
  78. from ray.exceptions import RpcError
  79. logger = logging.getLogger(__name__)
  80. # Status of a node e.g. "up-to-date", see ray/autoscaler/tags.py
  81. NodeStatus = str
  82. # Tuple of modified fields for the given node_id returned by should_update
  83. # that will be passed into a NodeUpdaterThread.
  84. UpdateInstructions = namedtuple(
  85. "UpdateInstructions",
  86. ["node_id", "setup_commands", "ray_start_commands", "docker_config"],
  87. )
  88. NodeLaunchData = Tuple[NodeTypeConfigDict, NodeCount, Optional[NodeType]]
  89. @dataclass
  90. class AutoscalerSummary:
  91. active_nodes: Dict[NodeType, int]
  92. idle_nodes: Optional[Dict[NodeType, int]]
  93. pending_nodes: List[Tuple[NodeIP, NodeType, NodeStatus]]
  94. pending_launches: Dict[NodeType, int]
  95. failed_nodes: List[Tuple[NodeIP, NodeType]]
  96. node_availability_summary: NodeAvailabilitySummary = field(
  97. default_factory=lambda: NodeAvailabilitySummary({})
  98. )
  99. # A dictionary of node IP to a list of reasons the node is not idle.
  100. node_activities: Optional[Dict[str, Tuple[NodeIP, List[str]]]] = None
  101. pending_resources: Dict[str, int] = field(default_factory=lambda: {})
  102. # A mapping from node name (the same key as `usage_by_node`) to node type.
  103. # Optional for deployment modes which have the concept of node types and
  104. # backwards compatibility.
  105. node_type_mapping: Optional[Dict[str, str]] = None
  106. # Whether the autoscaler summary is v1 or v2.
  107. legacy: bool = False
  108. class NonTerminatedNodes:
  109. """Class to extract and organize information on non-terminated nodes."""
  110. def __init__(self, provider: NodeProvider):
  111. start_time = time.time()
  112. # All non-terminated nodes
  113. self.all_node_ids = provider.non_terminated_nodes({})
  114. # Managed worker nodes (node kind "worker"):
  115. self.worker_ids: List[NodeID] = []
  116. # The head node (node kind "head")
  117. self.head_id: Optional[NodeID] = None
  118. for node in self.all_node_ids:
  119. node_kind = provider.node_tags(node)[TAG_RAY_NODE_KIND]
  120. if node_kind == NODE_KIND_WORKER:
  121. self.worker_ids.append(node)
  122. elif node_kind == NODE_KIND_HEAD:
  123. self.head_id = node
  124. # Note: For typical use-cases, self.all_node_ids == self.worker_ids +
  125. # [self.head_id]. The difference being in the case of unmanaged nodes.
  126. # Record the time of the non_terminated nodes call. This typically
  127. # translates to a "describe" or "list" call on most cluster managers
  128. # which can be quite expensive. Note that we include the processing
  129. # time because on some clients, there may be pagination and the
  130. # underlying api calls may be done lazily.
  131. self.non_terminated_nodes_time = time.time() - start_time
  132. logger.info(
  133. f"The autoscaler took {round(self.non_terminated_nodes_time, 3)}"
  134. " seconds to fetch the list of non-terminated nodes."
  135. )
  136. def remove_terminating_nodes(self, terminating_nodes: List[NodeID]) -> None:
  137. """Remove nodes we're in the process of terminating from internal
  138. state."""
  139. def not_terminating(node):
  140. return node not in terminating_nodes
  141. self.worker_ids = list(filter(not_terminating, self.worker_ids))
  142. self.all_node_ids = list(filter(not_terminating, self.all_node_ids))
  143. # Whether a worker should be kept based on the min_workers and
  144. # max_workers constraints.
  145. #
  146. # keep: should keep the worker
  147. # terminate: should terminate the worker
  148. # decide_later: the worker can be terminated if needed
  149. KeepOrTerminate = Enum("KeepOrTerminate", "keep terminate decide_later")
  150. class StandardAutoscaler:
  151. """The autoscaling control loop for a Ray cluster.
  152. There are two ways to start an autoscaling cluster: manually by running
  153. `ray start --head --autoscaling-config=/path/to/config.yaml` on a instance
  154. that has permission to launch other instances, or you can also use `ray up
  155. /path/to/config.yaml` from your laptop, which will configure the right
  156. AWS/Cloud roles automatically. See the Ray documentation
  157. (https://docs.ray.io/en/latest/) for a full definition of autoscaling behavior.
  158. StandardAutoscaler's `update` method is periodically called in
  159. `monitor.py`'s monitoring loop.
  160. StandardAutoscaler is also used to bootstrap clusters (by adding workers
  161. until the cluster size that can handle the resource demand is met).
  162. """
  163. def __init__(
  164. self,
  165. # TODO(ekl): require config reader to be a callable always.
  166. config_reader: Union[str, Callable[[], dict]],
  167. load_metrics: LoadMetrics,
  168. gcs_client: "ray._raylet.GcsClient",
  169. session_name: Optional[str] = None,
  170. max_launch_batch: int = AUTOSCALER_MAX_LAUNCH_BATCH,
  171. max_concurrent_launches: int = AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
  172. max_failures: int = AUTOSCALER_MAX_NUM_FAILURES,
  173. process_runner: Any = subprocess,
  174. update_interval_s: int = AUTOSCALER_UPDATE_INTERVAL_S,
  175. prefix_cluster_info: bool = False,
  176. event_summarizer: Optional[EventSummarizer] = None,
  177. prom_metrics: Optional[AutoscalerPrometheusMetrics] = None,
  178. ):
  179. """Create a StandardAutoscaler.
  180. Args:
  181. config_reader: Path to a Ray Autoscaler YAML, or a function to read
  182. and return the latest config.
  183. load_metrics: Provides metrics for the Ray cluster.
  184. session_name: The current Ray session name when this autoscaler
  185. is deployed.
  186. max_launch_batch: Max number of nodes to launch in one request.
  187. max_concurrent_launches: Max number of nodes that can be
  188. concurrently launched. This value and `max_launch_batch`
  189. determine the number of batches that are used to launch nodes.
  190. max_failures: Number of failures that the autoscaler will tolerate
  191. before exiting.
  192. process_runner: Subproc-like interface used by the CommandRunner.
  193. update_interval_s: Seconds between running the autoscaling loop.
  194. prefix_cluster_info: Whether to add the cluster name to info strs.
  195. event_summarizer: Utility to consolidate duplicated messages.
  196. prom_metrics: Prometheus metrics for autoscaler-related operations.
  197. gcs_client: client for interactions with the GCS. Used to drain nodes
  198. before termination.
  199. """
  200. if isinstance(config_reader, str):
  201. # Auto wrap with file reader.
  202. def read_fn():
  203. with open(config_reader) as f:
  204. new_config = yaml.safe_load(f.read())
  205. return new_config
  206. self.config_reader = read_fn
  207. else:
  208. self.config_reader = config_reader
  209. self.node_provider_availability_tracker = NodeProviderAvailabilityTracker()
  210. # Prefix each line of info string with cluster name if True
  211. self.prefix_cluster_info = prefix_cluster_info
  212. # Keep this before self.reset (self.provider needs to be created
  213. # exactly once).
  214. self.provider = None
  215. # Keep this before self.reset (if an exception occurs in reset
  216. # then prom_metrics must be instantitiated to increment the
  217. # exception counter)
  218. self.prom_metrics = prom_metrics or AutoscalerPrometheusMetrics(
  219. session_name=session_name
  220. ) # noqa
  221. self.resource_demand_scheduler = None
  222. self.reset(errors_fatal=True)
  223. self.load_metrics = load_metrics
  224. self.max_failures = max_failures
  225. self.max_launch_batch = max_launch_batch
  226. self.max_concurrent_launches = max_concurrent_launches
  227. self.process_runner = process_runner
  228. self.event_summarizer = event_summarizer or EventSummarizer()
  229. # Map from node_id to NodeUpdater threads
  230. self.updaters: Dict[NodeID, NodeUpdaterThread] = {}
  231. self.num_failed_updates: Dict[NodeID, int] = defaultdict(int)
  232. self.num_successful_updates: Dict[NodeID, int] = defaultdict(int)
  233. self.num_failures = 0
  234. self.last_update_time = 0.0
  235. self.update_interval_s = update_interval_s
  236. # Keeps track of pending and running nodes
  237. self.non_terminated_nodes: Optional[NonTerminatedNodes] = None
  238. # Tracks nodes scheduled for termination
  239. self.nodes_to_terminate: List[NodeID] = []
  240. # Disable NodeUpdater threads if true.
  241. # Should be set to true in situations where another component, such as
  242. # a Kubernetes operator, is responsible for Ray setup on nodes.
  243. self.disable_node_updaters = self.config["provider"].get(
  244. DISABLE_NODE_UPDATERS_KEY, False
  245. )
  246. logger.info(f"{DISABLE_NODE_UPDATERS_KEY}:{self.disable_node_updaters}")
  247. # Disable launch configuration checking if set to true.
  248. # This setting is used in scenarios where there is no meaningful node type
  249. # to enforce, such as in fake multinode situations. When this option is enabled,
  250. # outdated nodes will not be terminated.
  251. self.disable_launch_config_check = self.config["provider"].get(
  252. DISABLE_LAUNCH_CONFIG_CHECK_KEY, False
  253. )
  254. logger.info(
  255. f"{DISABLE_LAUNCH_CONFIG_CHECK_KEY}:{self.disable_launch_config_check}"
  256. )
  257. # By default, the autoscaler launches nodes in batches asynchronously in
  258. # background threads.
  259. # When the following flag is set, that behavior is disabled, so that nodes
  260. # are launched in the main thread, all in one batch, blocking until all
  261. # NodeProvider.create_node calls have returned.
  262. self.foreground_node_launch = self.config["provider"].get(
  263. FOREGROUND_NODE_LAUNCH_KEY, False
  264. )
  265. logger.info(f"{FOREGROUND_NODE_LAUNCH_KEY}:{self.foreground_node_launch}")
  266. # By default, the autoscaler kills and/or tries to recover
  267. # a worker node if it hasn't produced a resource heartbeat in the last 30
  268. # seconds. The worker_liveness_check flag allows disabling this behavior in
  269. # settings where another component, such as a Kubernetes operator, is
  270. # responsible for healthchecks.
  271. self.worker_liveness_check = self.config["provider"].get(
  272. WORKER_LIVENESS_CHECK_KEY, True
  273. )
  274. logger.info(f"{WORKER_LIVENESS_CHECK_KEY}:{self.worker_liveness_check}")
  275. # Node launchers
  276. self.foreground_node_launcher: Optional[BaseNodeLauncher] = None
  277. self.launch_queue: Optional[queue.Queue[NodeLaunchData]] = None
  278. self.pending_launches = ConcurrentCounter()
  279. if self.foreground_node_launch:
  280. self.foreground_node_launcher = BaseNodeLauncher(
  281. provider=self.provider,
  282. pending=self.pending_launches,
  283. event_summarizer=self.event_summarizer,
  284. node_provider_availability_tracker=self.node_provider_availability_tracker, # noqa: E501 Flake and black disagree how to format this.
  285. session_name=session_name,
  286. node_types=self.available_node_types,
  287. prom_metrics=self.prom_metrics,
  288. )
  289. else:
  290. self.launch_queue = queue.Queue()
  291. max_batches = math.ceil(max_concurrent_launches / float(max_launch_batch))
  292. for i in range(int(max_batches)):
  293. node_launcher = NodeLauncher(
  294. provider=self.provider,
  295. queue=self.launch_queue,
  296. index=i,
  297. pending=self.pending_launches,
  298. event_summarizer=self.event_summarizer,
  299. node_provider_availability_tracker=self.node_provider_availability_tracker, # noqa: E501 Flake and black disagreee how to format this.
  300. session_name=session_name,
  301. node_types=self.available_node_types,
  302. prom_metrics=self.prom_metrics,
  303. )
  304. node_launcher.daemon = True
  305. node_launcher.start()
  306. # NodeTracker maintains soft state to track the number of recently
  307. # failed nodes. It is best effort only.
  308. self.node_tracker = NodeTracker()
  309. # Expand local file_mounts to allow ~ in the paths. This can't be done
  310. # earlier when the config is written since we might be on different
  311. # platform and the expansion would result in wrong path.
  312. self.config["file_mounts"] = {
  313. remote: os.path.expanduser(local)
  314. for remote, local in self.config["file_mounts"].items()
  315. }
  316. self.gcs_client = gcs_client
  317. for local_path in self.config["file_mounts"].values():
  318. assert os.path.exists(local_path)
  319. logger.info("StandardAutoscaler: {}".format(self.config))
  320. @property
  321. def all_node_types(self) -> Set[str]:
  322. return self.config["available_node_types"].keys()
  323. def update(self):
  324. try:
  325. self.reset(errors_fatal=False)
  326. self._update()
  327. except Exception as e:
  328. self.prom_metrics.update_loop_exceptions.inc()
  329. logger.exception("StandardAutoscaler: Error during autoscaling.")
  330. self.num_failures += 1
  331. if self.num_failures > self.max_failures:
  332. logger.critical("StandardAutoscaler: Too many errors, abort.")
  333. raise e
  334. def _update(self):
  335. # For type checking, assert that these objects have been instantitiated.
  336. assert self.provider
  337. assert self.resource_demand_scheduler
  338. now = time.time()
  339. # Throttle autoscaling updates to this interval to avoid exceeding
  340. # rate limits on API calls.
  341. if now - self.last_update_time < self.update_interval_s:
  342. return
  343. self.last_update_time = now
  344. # Query the provider to update the list of non-terminated nodes
  345. self.non_terminated_nodes = NonTerminatedNodes(self.provider)
  346. # Back off the update if the provider says it's not safe to proceed.
  347. if not self.provider.safe_to_scale():
  348. logger.info(
  349. "Backing off of autoscaler update."
  350. f" Will try again in {self.update_interval_s} seconds."
  351. )
  352. return
  353. # This will accumulate the nodes we need to terminate.
  354. self.nodes_to_terminate = []
  355. # Update status strings
  356. if AUTOSCALER_STATUS_LOG:
  357. logger.info(self.info_string())
  358. legacy_log_info_string(self, self.non_terminated_nodes.worker_ids)
  359. if not self.provider.is_readonly():
  360. self.terminate_nodes_to_enforce_config_constraints(now)
  361. if self.disable_node_updaters:
  362. # Don't handle unhealthy nodes if the liveness check is disabled.
  363. # self.worker_liveness_check is True by default.
  364. if self.worker_liveness_check:
  365. self.terminate_unhealthy_nodes(now)
  366. else:
  367. self.process_completed_updates()
  368. self.update_nodes()
  369. # Don't handle unhealthy nodes if the liveness check is disabled.
  370. # self.worker_liveness_check is True by default.
  371. if self.worker_liveness_check:
  372. self.attempt_to_recover_unhealthy_nodes(now)
  373. self.set_prometheus_updater_data()
  374. # Update running nodes gauge
  375. num_workers = len(self.non_terminated_nodes.worker_ids)
  376. self.prom_metrics.running_workers.set(num_workers)
  377. # Remove IPs from LoadMetrics that are not known to the NodeProvider.
  378. active_node_ips: List[str] = []
  379. for active_node_id in self.non_terminated_nodes.all_node_ids:
  380. try:
  381. active_node_ips.append(self.provider.internal_ip(active_node_id))
  382. # Catch generic Exception because different node providers
  383. # can raise different types of exceptions
  384. except Exception:
  385. logger.exception(
  386. "Failed to get ip of node with id"
  387. f" {active_node_id} when pruning IPs from LoadMetrics."
  388. )
  389. self.load_metrics.prune_active_ips(active_ips=active_node_ips)
  390. # Dict[NodeType, int], List[ResourceDict]
  391. to_launch, unfulfilled = self.resource_demand_scheduler.get_nodes_to_launch(
  392. self.non_terminated_nodes.all_node_ids,
  393. self.pending_launches.breakdown(),
  394. self.load_metrics.get_resource_demand_vector(),
  395. self.load_metrics.get_resource_utilization(),
  396. self.load_metrics.get_pending_placement_groups(),
  397. self.load_metrics.get_static_node_resources_by_ip(),
  398. ensure_min_cluster_size=self.load_metrics.get_resource_requests(),
  399. node_availability_summary=self.node_provider_availability_tracker.summary(),
  400. )
  401. self._report_pending_infeasible(unfulfilled)
  402. if not self.provider.is_readonly():
  403. self.launch_required_nodes(to_launch)
  404. # Execute optional end-of-update logic.
  405. # Keep this method call at the end of autoscaler._update().
  406. self.provider.post_process()
  407. # Record the amount of time the autoscaler took for
  408. # this _update() iteration.
  409. update_time = time.time() - self.last_update_time
  410. logger.info(
  411. f"The autoscaler took {round(update_time, 3)}"
  412. " seconds to complete the update iteration."
  413. )
  414. self.prom_metrics.update_time.observe(update_time)
  415. def terminate_nodes_to_enforce_config_constraints(self, now: float):
  416. """Terminates nodes to enforce constraints defined by the autoscaling
  417. config.
  418. (1) Terminates nodes in excess of `max_workers`.
  419. (2) Terminates nodes idle for longer than `idle_timeout_minutes`.
  420. (3) Terminates outdated nodes,
  421. namely nodes whose configs don't match `node_config` for the
  422. relevant node type.
  423. Avoids terminating non-outdated nodes required by
  424. autoscaler.sdk.request_resources().
  425. """
  426. # For type checking, assert that these objects have been instantitiated.
  427. assert self.non_terminated_nodes
  428. assert self.provider
  429. last_used = self.load_metrics.ray_nodes_last_used_time_by_ip
  430. idle_timeout_s = 60 * self.config["idle_timeout_minutes"]
  431. last_used_cutoff = now - idle_timeout_s
  432. # Sort based on last used to make sure to keep min_workers that
  433. # were most recently used. Otherwise, _keep_min_workers_of_node_type
  434. # might keep a node that should be terminated.
  435. sorted_node_ids = self._sort_based_on_last_used(
  436. self.non_terminated_nodes.worker_ids, last_used
  437. )
  438. # Don't terminate nodes needed by request_resources()
  439. nodes_not_allowed_to_terminate = self._get_nodes_needed_for_request_resources(
  440. sorted_node_ids
  441. )
  442. # Tracks counts of nodes we intend to keep for each node type.
  443. node_type_counts = defaultdict(int)
  444. def keep_node(node_id: NodeID) -> None:
  445. assert self.provider
  446. # Update per-type counts.
  447. tags = self.provider.node_tags(node_id)
  448. if TAG_RAY_USER_NODE_TYPE in tags:
  449. node_type = tags[TAG_RAY_USER_NODE_TYPE]
  450. node_type_counts[node_type] += 1
  451. # Nodes that we could terminate, if needed.
  452. nodes_we_could_terminate: List[NodeID] = []
  453. for node_id in sorted_node_ids:
  454. # Make sure to not kill idle node types if the number of workers
  455. # of that type is lower/equal to the min_workers of that type
  456. # or it is needed for request_resources().
  457. should_keep_or_terminate, reason = self._keep_worker_of_node_type(
  458. node_id, node_type_counts
  459. )
  460. if should_keep_or_terminate == KeepOrTerminate.terminate:
  461. self.schedule_node_termination(node_id, reason, logger.info)
  462. continue
  463. if (
  464. should_keep_or_terminate == KeepOrTerminate.keep
  465. or node_id in nodes_not_allowed_to_terminate
  466. ) and self.launch_config_ok(node_id):
  467. keep_node(node_id)
  468. continue
  469. node_ip: Optional[str] = None
  470. try:
  471. node_ip = self.provider.internal_ip(node_id)
  472. # Catch generic Exception because different node providers
  473. # can raise different types of exceptions
  474. except Exception:
  475. logger.exception(
  476. "Failed to get ip of node with id"
  477. f" {node_id} when finding nodes to terminate."
  478. )
  479. if (
  480. node_ip
  481. and node_ip in last_used
  482. and last_used[node_ip] < last_used_cutoff
  483. ):
  484. self.schedule_node_termination(node_id, "idle", logger.info)
  485. # Get the local time of the node's last use as a string.
  486. formatted_last_used_time = time.asctime(
  487. time.localtime(last_used[node_ip])
  488. )
  489. logger.info(f"Node last used: {formatted_last_used_time}.")
  490. # Note that the current time will appear in the log prefix.
  491. elif not self.launch_config_ok(node_id):
  492. self.schedule_node_termination(node_id, "outdated", logger.info)
  493. else:
  494. keep_node(node_id)
  495. nodes_we_could_terminate.append(node_id)
  496. # Terminate nodes if there are too many
  497. num_workers = len(self.non_terminated_nodes.worker_ids)
  498. num_extra_nodes_to_terminate = (
  499. num_workers - len(self.nodes_to_terminate) - self.config["max_workers"]
  500. )
  501. if num_extra_nodes_to_terminate > len(nodes_we_could_terminate):
  502. logger.warning(
  503. "StandardAutoscaler: trying to terminate "
  504. f"{num_extra_nodes_to_terminate} nodes, while only "
  505. f"{len(nodes_we_could_terminate)} are safe to terminate."
  506. " Inconsistent config is likely."
  507. )
  508. num_extra_nodes_to_terminate = len(nodes_we_could_terminate)
  509. # If num_extra_nodes_to_terminate is negative or zero,
  510. # we would have less than max_workers nodes after terminating
  511. # nodes_to_terminate and we do not need to terminate anything else.
  512. if num_extra_nodes_to_terminate > 0:
  513. extra_nodes_to_terminate = nodes_we_could_terminate[
  514. -num_extra_nodes_to_terminate:
  515. ]
  516. for node_id in extra_nodes_to_terminate:
  517. self.schedule_node_termination(node_id, "max workers", logger.info)
  518. self.terminate_scheduled_nodes()
  519. def schedule_node_termination(
  520. self, node_id: NodeID, reason_opt: Optional[str], logger_method: Callable
  521. ) -> None:
  522. # For type checking, assert that this object has been instantitiated.
  523. assert self.provider
  524. if reason_opt is None:
  525. raise Exception("reason should be not None.")
  526. reason: str = reason_opt
  527. node_ip: Optional[str] = None
  528. try:
  529. node_ip = self.provider.internal_ip(node_id)
  530. # Catch generic Exception because different node providers
  531. # can raise different types of exceptions
  532. except Exception:
  533. logger.exception(
  534. "Failed to get ip of node with id"
  535. f" {node_id} when scheduling node termination."
  536. )
  537. # Log, record an event, and add node_id to nodes_to_terminate.
  538. logger_method(
  539. "StandardAutoscaler: "
  540. f"Terminating the node with id {node_id}"
  541. f" and ip {node_ip}."
  542. f" ({reason})"
  543. )
  544. self.event_summarizer.add(
  545. "Removing {} nodes of type "
  546. + self._get_node_type(node_id)
  547. + " ({}).".format(reason),
  548. quantity=1,
  549. aggregate=operator.add,
  550. )
  551. self.nodes_to_terminate.append(node_id)
  552. def terminate_scheduled_nodes(self):
  553. """Terminate scheduled nodes and clean associated autoscaler state."""
  554. # For type checking, assert that these objects have been instantitiated.
  555. assert self.provider
  556. assert self.non_terminated_nodes
  557. if not self.nodes_to_terminate:
  558. return
  559. # Drain the nodes
  560. self.drain_nodes_via_gcs(self.nodes_to_terminate)
  561. # Terminate the nodes
  562. self.provider.terminate_nodes(self.nodes_to_terminate)
  563. for node in self.nodes_to_terminate:
  564. self.node_tracker.untrack(node)
  565. self.prom_metrics.stopped_nodes.inc()
  566. # Update internal node lists
  567. self.non_terminated_nodes.remove_terminating_nodes(self.nodes_to_terminate)
  568. self.nodes_to_terminate = []
  569. def drain_nodes_via_gcs(self, provider_node_ids_to_drain: List[NodeID]):
  570. """Send an RPC request to the GCS to drain (prepare for termination)
  571. the nodes with the given node provider ids.
  572. note: The current implementation of DrainNode on the GCS side is to
  573. de-register and gracefully shut down the Raylets. In the future,
  574. the behavior may change to better reflect the name "Drain."
  575. See https://github.com/ray-project/ray/pull/19350.
  576. """
  577. # For type checking, assert that this object has been instantitiated.
  578. assert self.provider
  579. # The GCS expects Node ids in the request, rather than NodeProvider
  580. # ids. To get the Node ids of the nodes to we're draining, we make
  581. # the following translations of identifiers:
  582. # node provider node id -> ip -> node id
  583. # Convert node provider node ids to ips.
  584. node_ips = set()
  585. failed_ip_fetch = False
  586. for provider_node_id in provider_node_ids_to_drain:
  587. # If the provider's call to fetch ip fails, the exception is not
  588. # fatal. Log the exception and proceed.
  589. try:
  590. ip = self.provider.internal_ip(provider_node_id)
  591. node_ips.add(ip)
  592. # Catch generic Exception because different node providers
  593. # can raise different types of exceptions
  594. except Exception:
  595. logger.exception(
  596. "Failed to get ip of node with id"
  597. f" {provider_node_id} during scale-down."
  598. )
  599. failed_ip_fetch = True
  600. if failed_ip_fetch:
  601. self.prom_metrics.drain_node_exceptions.inc()
  602. # Only attempt to drain connected nodes, i.e. nodes with ips in
  603. # LoadMetrics.
  604. connected_node_ips = node_ips & self.load_metrics.node_id_by_ip.keys()
  605. # Convert ips to Node ids.
  606. # (The assignment ip->node_id is well-defined under current
  607. # assumptions. See "use_node_id_as_ip" in monitor.py)
  608. node_ids_to_drain = {
  609. self.load_metrics.node_id_by_ip[ip] for ip in connected_node_ips
  610. }
  611. if not node_ids_to_drain:
  612. return
  613. logger.info(f"Draining {len(node_ids_to_drain)} raylet(s).")
  614. try:
  615. # A successful response indicates that the GCS has marked the
  616. # desired nodes as "drained." The cloud provider can then terminate
  617. # the nodes without the GCS printing an error.
  618. # Check if we succeeded in draining all of the intended nodes by
  619. # looking at the RPC response.
  620. drained_node_ids = set(
  621. self.gcs_client.drain_nodes(node_ids_to_drain, timeout=5)
  622. )
  623. failed_to_drain = node_ids_to_drain - drained_node_ids
  624. if failed_to_drain:
  625. self.prom_metrics.drain_node_exceptions.inc()
  626. logger.error(f"Failed to drain {len(failed_to_drain)} raylet(s).")
  627. # If we get a gRPC error with an UNIMPLEMENTED code, fail silently.
  628. # This error indicates that the GCS is using Ray version < 1.8.0,
  629. # for which DrainNode is not implemented.
  630. except RpcError as e:
  631. # If the code is UNIMPLEMENTED, pass.
  632. if e.rpc_code == ray._raylet.GRPC_STATUS_CODE_UNIMPLEMENTED:
  633. pass
  634. # Otherwise, it's a plain old gRPC error and we should log it.
  635. else:
  636. self.prom_metrics.drain_node_exceptions.inc()
  637. logger.exception("Failed to drain Ray nodes. Traceback follows.")
  638. except Exception:
  639. # We don't need to interrupt the autoscaler update with an
  640. # exception, but we should log what went wrong and record the
  641. # failure in Prometheus.
  642. self.prom_metrics.drain_node_exceptions.inc()
  643. logger.exception("Failed to drain Ray nodes. Traceback follows.")
  644. def launch_required_nodes(self, to_launch: Dict[NodeType, int]) -> None:
  645. if to_launch:
  646. for node_type, count in to_launch.items():
  647. self.launch_new_node(count, node_type=node_type)
  648. def update_nodes(self):
  649. """Run NodeUpdaterThreads to run setup commands, sync files,
  650. and/or start Ray.
  651. """
  652. # Update nodes with out-of-date files.
  653. # TODO(edoakes): Spawning these threads directly seems to cause
  654. # problems. They should at a minimum be spawned as daemon threads.
  655. # See https://github.com/ray-project/ray/pull/5903 for more info.
  656. T = []
  657. for node_id, setup_commands, ray_start_commands, docker_config in (
  658. self.should_update(node_id)
  659. for node_id in self.non_terminated_nodes.worker_ids
  660. ):
  661. if node_id is not None:
  662. resources = self._node_resources(node_id)
  663. labels = self._node_labels(node_id)
  664. logger.debug(f"{node_id}: Starting new thread runner.")
  665. T.append(
  666. threading.Thread(
  667. target=self.spawn_updater,
  668. args=(
  669. node_id,
  670. setup_commands,
  671. ray_start_commands,
  672. resources,
  673. labels,
  674. docker_config,
  675. ),
  676. )
  677. )
  678. for t in T:
  679. t.start()
  680. for t in T:
  681. t.join()
  682. def process_completed_updates(self):
  683. """Clean up completed NodeUpdaterThreads."""
  684. completed_nodes = []
  685. for node_id, updater in self.updaters.items():
  686. if not updater.is_alive():
  687. completed_nodes.append(node_id)
  688. if completed_nodes:
  689. failed_nodes = []
  690. for node_id in completed_nodes:
  691. updater = self.updaters[node_id]
  692. if updater.exitcode == 0:
  693. self.num_successful_updates[node_id] += 1
  694. self.prom_metrics.successful_updates.inc()
  695. if updater.for_recovery:
  696. self.prom_metrics.successful_recoveries.inc()
  697. if updater.update_time:
  698. self.prom_metrics.worker_update_time.observe(
  699. updater.update_time
  700. )
  701. # Mark the node as active to prevent the node recovery
  702. # logic immediately trying to restart Ray on the new node.
  703. node_ip: Optional[str] = None
  704. try:
  705. node_ip = self.provider.internal_ip(node_id)
  706. # Catch generic Exception because different node providers
  707. # can raise different types of exceptions
  708. except Exception:
  709. logger.exception(
  710. f"Failed to get ip of node with id {node_id} when marking node as active"
  711. )
  712. if node_ip:
  713. self.load_metrics.mark_active(node_ip)
  714. else:
  715. failed_nodes.append(node_id)
  716. self.num_failed_updates[node_id] += 1
  717. self.prom_metrics.failed_updates.inc()
  718. if updater.for_recovery:
  719. self.prom_metrics.failed_recoveries.inc()
  720. self.node_tracker.untrack(node_id)
  721. del self.updaters[node_id]
  722. if failed_nodes:
  723. # Some nodes in failed_nodes may already have been terminated
  724. # during an update (for being idle after missing a heartbeat).
  725. # Update the list of non-terminated workers.
  726. for node_id in failed_nodes:
  727. # Check if the node has already been terminated.
  728. if node_id in self.non_terminated_nodes.worker_ids:
  729. self.schedule_node_termination(
  730. node_id, "launch failed", logger.error
  731. )
  732. else:
  733. logger.warning(
  734. f"StandardAutoscaler: {node_id}:"
  735. " Failed to update node."
  736. " Node has already been terminated."
  737. )
  738. self.terminate_scheduled_nodes()
  739. def set_prometheus_updater_data(self):
  740. """Record total number of active NodeUpdaterThreads and how many of
  741. these are being run to recover nodes.
  742. """
  743. self.prom_metrics.updating_nodes.set(len(self.updaters))
  744. num_recovering = 0
  745. for updater in self.updaters.values():
  746. if updater.for_recovery:
  747. num_recovering += 1
  748. self.prom_metrics.recovering_nodes.set(num_recovering)
  749. def _report_pending_infeasible(self, unfulfilled: List[ResourceDict]):
  750. """Emit event messages for infeasible or unschedulable tasks.
  751. This adds messages to the event summarizer for warning on infeasible
  752. or "cluster full" resource requests.
  753. Args:
  754. unfulfilled: List of resource demands that would be unfulfilled
  755. even after full scale-up.
  756. """
  757. # For type checking, assert that this object has been instantitiated.
  758. assert self.resource_demand_scheduler
  759. infeasible = []
  760. for bundle in unfulfilled:
  761. placement_group = any(
  762. "_group_" in k or k == PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME
  763. for k in bundle
  764. )
  765. if placement_group:
  766. continue
  767. if not self.resource_demand_scheduler.is_feasible(bundle):
  768. infeasible.append(bundle)
  769. if infeasible:
  770. for request in infeasible:
  771. self.event_summarizer.add_once_per_interval(
  772. "Error: No available node types can fulfill resource "
  773. "request {}. Add suitable node types to this cluster to "
  774. "resolve this issue.".format(request),
  775. key="infeasible_{}".format(sorted(request.items())),
  776. interval_s=30,
  777. )
  778. def _sort_based_on_last_used(
  779. self, nodes: List[NodeID], last_used: Dict[str, float]
  780. ) -> List[NodeID]:
  781. """Sort the nodes based on the last time they were used.
  782. The first item in the return list is the most recently used.
  783. """
  784. last_used_copy = copy.deepcopy(last_used)
  785. # Add the unconnected nodes as the least recently used (the end of
  786. # list). This prioritizes connected nodes.
  787. least_recently_used = -1
  788. def last_time_used(node_id: NodeID):
  789. assert self.provider
  790. try:
  791. node_ip = self.provider.internal_ip(node_id)
  792. # Catch generic Exception because different node providers
  793. # can raise different types of exceptions
  794. except Exception:
  795. logger.exception(f"Failed to get ip of node with id {node_id}")
  796. return least_recently_used
  797. if node_ip not in last_used_copy:
  798. return least_recently_used
  799. else:
  800. return last_used_copy[node_ip]
  801. return sorted(nodes, key=last_time_used, reverse=True)
  802. def _get_nodes_needed_for_request_resources(
  803. self, sorted_node_ids: List[NodeID]
  804. ) -> FrozenSet[NodeID]:
  805. # TODO(ameer): try merging this with resource_demand_scheduler
  806. # code responsible for adding nodes for get_resource_requests() and get_pending_placement_groups().
  807. """Returns the nodes NOT allowed to terminate due to get_resource_requests() and get_pending_placement_groups().
  808. Args:
  809. sorted_node_ids: the node ids sorted based on last used (LRU last).
  810. Returns:
  811. FrozenSet[NodeID]: a set of nodes (node ids) that
  812. we should NOT terminate.
  813. """
  814. # For type checking, assert that this object has been instantitiated.
  815. assert self.provider
  816. nodes_not_allowed_to_terminate: Set[NodeID] = set()
  817. resource_demands, strict_spreads = placement_groups_to_resource_demands(
  818. self.load_metrics.get_pending_placement_groups()
  819. )
  820. resource_demands.extend(self.load_metrics.get_resource_requests())
  821. if not resource_demands and not strict_spreads:
  822. return frozenset(nodes_not_allowed_to_terminate)
  823. static_node_resources: Dict[
  824. NodeIP, ResourceDict
  825. ] = self.load_metrics.get_static_node_resources_by_ip()
  826. head_node_resources: ResourceDict = copy.deepcopy(
  827. self.available_node_types[self.config["head_node_type"]]["resources"]
  828. )
  829. # TODO(ameer): this is somewhat duplicated in
  830. # resource_demand_scheduler.py.
  831. if not head_node_resources:
  832. # Legacy yaml might include {} in the resources field.
  833. head_node_ip = self.provider.internal_ip(self.non_terminated_nodes.head_id)
  834. head_node_resources = static_node_resources.get(head_node_ip, {})
  835. node_total_resources: List[ResourceDict] = [head_node_resources]
  836. resource_demand_vector_worker_node_ids = []
  837. # Get max resources on all the non terminated nodes.
  838. for node_id in sorted_node_ids:
  839. tags = self.provider.node_tags(node_id)
  840. if TAG_RAY_USER_NODE_TYPE in tags:
  841. node_type = tags[TAG_RAY_USER_NODE_TYPE]
  842. node_resources: ResourceDict = copy.deepcopy(
  843. self.available_node_types[node_type]["resources"]
  844. )
  845. if not node_resources:
  846. # Legacy yaml might include {} in the resources field.
  847. node_ip = self.provider.internal_ip(node_id)
  848. node_resources = static_node_resources.get(node_ip, {})
  849. node_total_resources.append(node_resources)
  850. resource_demand_vector_worker_node_ids.append(node_id)
  851. # Since it is sorted based on last used, we "keep" nodes that are
  852. # most recently used when we binpack. We assume get_bin_pack_residual
  853. # is following the given order here.
  854. node_remaining_resources = copy.deepcopy(node_total_resources)
  855. for strict_spread in strict_spreads:
  856. unfulfilled, updated_node_remaining_resources = get_bin_pack_residual(
  857. node_remaining_resources, strict_spread, strict_spread=True
  858. )
  859. if unfulfilled:
  860. continue
  861. node_remaining_resources = updated_node_remaining_resources
  862. _, node_remaining_resources = get_bin_pack_residual(
  863. node_remaining_resources, resource_demands
  864. )
  865. # Remove the first entry (the head node).
  866. node_total_resources.pop(0)
  867. # Remove the first entry (the head node).
  868. node_remaining_resources.pop(0)
  869. for i, node_id in enumerate(resource_demand_vector_worker_node_ids):
  870. if (
  871. node_remaining_resources[i] == node_total_resources[i]
  872. and node_total_resources[i]
  873. ):
  874. # No resources of the node were needed for request_resources().
  875. # node_total_resources[i] is an empty dict for legacy yamls
  876. # before the node is connected.
  877. pass
  878. else:
  879. nodes_not_allowed_to_terminate.add(node_id)
  880. return frozenset(nodes_not_allowed_to_terminate)
  881. def _keep_worker_of_node_type(
  882. self, node_id: NodeID, node_type_counts: Dict[NodeType, int]
  883. ) -> Tuple[KeepOrTerminate, Optional[str]]:
  884. """Determines if a worker should be kept based on the min_workers
  885. and max_workers constraint of the worker's node_type.
  886. Returns KeepOrTerminate.keep when both of the following hold:
  887. (a) The worker's node_type is present among the keys of the current
  888. config's available_node_types dict.
  889. (b) Deleting the node would violate the min_workers constraint for that
  890. worker's node_type.
  891. Returns KeepOrTerminate.terminate when both the following hold:
  892. (a) The worker's node_type is not present among the keys of the current
  893. config's available_node_types dict.
  894. (b) Keeping the node would violate the max_workers constraint for that
  895. worker's node_type.
  896. Return KeepOrTerminate.decide_later otherwise.
  897. Args:
  898. node_type_counts(Dict[NodeType, int]): The non_terminated node
  899. types counted so far.
  900. Returns:
  901. KeepOrTerminate: keep if the node should be kept, terminate if the
  902. node should be terminated, decide_later if we are allowed
  903. to terminate it, but do not have to.
  904. Optional[str]: reason for termination. Not None on
  905. KeepOrTerminate.terminate, None otherwise.
  906. """
  907. # For type checking, assert that this object has been instantitiated.
  908. assert self.provider
  909. tags = self.provider.node_tags(node_id)
  910. if TAG_RAY_USER_NODE_TYPE in tags:
  911. node_type = tags[TAG_RAY_USER_NODE_TYPE]
  912. min_workers = self.available_node_types.get(node_type, {}).get(
  913. "min_workers", 0
  914. )
  915. max_workers = self.available_node_types.get(node_type, {}).get(
  916. "max_workers", 0
  917. )
  918. if node_type not in self.available_node_types:
  919. # The node type has been deleted from the cluster config.
  920. # Allow terminating it if needed.
  921. available_node_types = list(self.available_node_types.keys())
  922. return (
  923. KeepOrTerminate.terminate,
  924. f"not in available_node_types: {available_node_types}",
  925. )
  926. new_count = node_type_counts[node_type] + 1
  927. if new_count <= min(min_workers, max_workers):
  928. return KeepOrTerminate.keep, None
  929. if new_count > max_workers:
  930. return KeepOrTerminate.terminate, "max_workers_per_type"
  931. return KeepOrTerminate.decide_later, None
  932. def _node_resources(self, node_id):
  933. node_type = self.provider.node_tags(node_id).get(TAG_RAY_USER_NODE_TYPE)
  934. if self.available_node_types:
  935. return self.available_node_types.get(node_type, {}).get("resources", {})
  936. else:
  937. return {}
  938. def _node_labels(self, node_id):
  939. node_type = self.provider.node_tags(node_id).get(TAG_RAY_USER_NODE_TYPE)
  940. if self.available_node_types:
  941. return self.available_node_types.get(node_type, {}).get("labels", {})
  942. else:
  943. return {}
  944. def reset(self, errors_fatal=False):
  945. sync_continuously = False
  946. if hasattr(self, "config"):
  947. sync_continuously = self.config.get("file_mounts_sync_continuously", False)
  948. try:
  949. new_config = self.config_reader()
  950. if new_config != getattr(self, "config", None):
  951. try:
  952. validate_config(new_config)
  953. except Exception as e:
  954. self.prom_metrics.config_validation_exceptions.inc()
  955. logger.debug(
  956. "Cluster config validation failed. The version of "
  957. "the ray CLI you launched this cluster with may "
  958. "be higher than the version of ray being run on "
  959. "the cluster. Some new features may not be "
  960. "available until you upgrade ray on your cluster.",
  961. exc_info=e,
  962. )
  963. logger.debug(
  964. f"New config after validation: {new_config},"
  965. f" of type: {type(new_config)}"
  966. )
  967. (new_runtime_hash, new_file_mounts_contents_hash) = hash_runtime_conf(
  968. new_config["file_mounts"],
  969. new_config["cluster_synced_files"],
  970. [
  971. new_config["worker_setup_commands"],
  972. new_config["worker_start_ray_commands"],
  973. ],
  974. generate_file_mounts_contents_hash=sync_continuously,
  975. )
  976. self.config = new_config
  977. self.runtime_hash = new_runtime_hash
  978. self.file_mounts_contents_hash = new_file_mounts_contents_hash
  979. if not self.provider:
  980. self.provider = _get_node_provider(
  981. self.config["provider"], self.config["cluster_name"]
  982. )
  983. # If using the LocalNodeProvider, make sure the head node is marked
  984. # non-terminated.
  985. if isinstance(self.provider, LocalNodeProvider):
  986. record_local_head_state_if_needed(self.provider)
  987. self.available_node_types = self.config["available_node_types"]
  988. upscaling_speed = self.config.get("upscaling_speed")
  989. aggressive = self.config.get("autoscaling_mode") == "aggressive"
  990. target_utilization_fraction = self.config.get("target_utilization_fraction")
  991. if upscaling_speed:
  992. upscaling_speed = float(upscaling_speed)
  993. # TODO(ameer): consider adding (if users ask) an option of
  994. # initial_upscaling_num_workers.
  995. elif aggressive:
  996. upscaling_speed = 99999
  997. logger.warning(
  998. "Legacy aggressive autoscaling mode "
  999. "detected. Replacing it by setting upscaling_speed to "
  1000. "99999."
  1001. )
  1002. elif target_utilization_fraction:
  1003. upscaling_speed = 1 / max(target_utilization_fraction, 0.001) - 1
  1004. logger.warning(
  1005. "Legacy target_utilization_fraction config "
  1006. "detected. Replacing it by setting upscaling_speed to "
  1007. + "1 / target_utilization_fraction - 1."
  1008. )
  1009. else:
  1010. upscaling_speed = 1.0
  1011. if self.resource_demand_scheduler:
  1012. # The node types are autofilled internally for legacy yamls,
  1013. # overwriting the class will remove the inferred node resources
  1014. # for legacy yamls.
  1015. self.resource_demand_scheduler.reset_config(
  1016. self.provider,
  1017. self.available_node_types,
  1018. self.config["max_workers"],
  1019. self.config["head_node_type"],
  1020. upscaling_speed,
  1021. )
  1022. else:
  1023. self.resource_demand_scheduler = ResourceDemandScheduler(
  1024. self.provider,
  1025. self.available_node_types,
  1026. self.config["max_workers"],
  1027. self.config["head_node_type"],
  1028. upscaling_speed,
  1029. )
  1030. except Exception as e:
  1031. self.prom_metrics.reset_exceptions.inc()
  1032. if errors_fatal:
  1033. raise e
  1034. else:
  1035. logger.exception("StandardAutoscaler: Error parsing config.")
  1036. def launch_config_ok(self, node_id):
  1037. if self.disable_launch_config_check:
  1038. return True
  1039. node_tags = self.provider.node_tags(node_id)
  1040. tag_launch_conf = node_tags.get(TAG_RAY_LAUNCH_CONFIG)
  1041. node_type = node_tags.get(TAG_RAY_USER_NODE_TYPE)
  1042. if node_type not in self.available_node_types:
  1043. # The node type has been deleted from the cluster config.
  1044. # Don't keep the node.
  1045. return False
  1046. # The `worker_nodes` field is deprecated in favor of per-node-type
  1047. # node_configs. We allow it for backwards-compatibility.
  1048. launch_config = copy.deepcopy(self.config.get("worker_nodes", {}))
  1049. if node_type:
  1050. launch_config.update(
  1051. self.config["available_node_types"][node_type]["node_config"]
  1052. )
  1053. calculated_launch_hash = hash_launch_conf(launch_config, self.config["auth"])
  1054. if calculated_launch_hash != tag_launch_conf:
  1055. return False
  1056. return True
  1057. def files_up_to_date(self, node_id):
  1058. node_tags = self.provider.node_tags(node_id)
  1059. applied_config_hash = node_tags.get(TAG_RAY_RUNTIME_CONFIG)
  1060. applied_file_mounts_contents_hash = node_tags.get(TAG_RAY_FILE_MOUNTS_CONTENTS)
  1061. if applied_config_hash != self.runtime_hash or (
  1062. self.file_mounts_contents_hash is not None
  1063. and self.file_mounts_contents_hash != applied_file_mounts_contents_hash
  1064. ):
  1065. logger.info(
  1066. "StandardAutoscaler: "
  1067. "{}: Runtime state is ({},{}), want ({},{})".format(
  1068. node_id,
  1069. applied_config_hash,
  1070. applied_file_mounts_contents_hash,
  1071. self.runtime_hash,
  1072. self.file_mounts_contents_hash,
  1073. )
  1074. )
  1075. return False
  1076. return True
  1077. def heartbeat_on_time(self, node_id: NodeID, now: float) -> bool:
  1078. """Determine whether we've received a heartbeat from a node within the
  1079. last AUTOSCALER_HEARTBEAT_TIMEOUT_S seconds.
  1080. """
  1081. # For type checking, assert that this object has been instantitiated.
  1082. assert self.provider
  1083. try:
  1084. key = self.provider.internal_ip(node_id)
  1085. # Catch generic Exception because different node providers
  1086. # can raise different types of exceptions
  1087. except Exception:
  1088. logger.exception(
  1089. "Failed to get ip of node with id"
  1090. f" {node_id} when checking if heartbeat is on time"
  1091. )
  1092. # Can't figure out if we've received a heartbeat from this node
  1093. # because the IP address is not available.
  1094. return False
  1095. if key in self.load_metrics.last_heartbeat_time_by_ip:
  1096. last_heartbeat_time = self.load_metrics.last_heartbeat_time_by_ip[key]
  1097. delta = now - last_heartbeat_time
  1098. if delta < AUTOSCALER_HEARTBEAT_TIMEOUT_S:
  1099. return True
  1100. return False
  1101. def terminate_unhealthy_nodes(self, now: float):
  1102. """Terminated nodes for which we haven't received a heartbeat on time.
  1103. These nodes are subsequently terminated.
  1104. """
  1105. # For type checking, assert that these objects have been instantitiated.
  1106. assert self.provider
  1107. assert self.non_terminated_nodes
  1108. for node_id in self.non_terminated_nodes.worker_ids:
  1109. node_status = self.provider.node_tags(node_id)[TAG_RAY_NODE_STATUS]
  1110. # We're not responsible for taking down
  1111. # nodes with pending or failed status:
  1112. if not node_status == STATUS_UP_TO_DATE:
  1113. continue
  1114. # This node is up-to-date. If it hasn't had the chance to produce
  1115. # a heartbeat, fake the heartbeat now (see logic for completed node
  1116. # updaters).
  1117. ip: Optional[str] = None
  1118. try:
  1119. ip = self.provider.internal_ip(node_id)
  1120. # Catch generic Exception because different node providers
  1121. # can raise different types of exceptions
  1122. except Exception:
  1123. logger.exception(
  1124. f"Failed to get ip of node with id"
  1125. f" {node_id} when marking node as active."
  1126. )
  1127. if ip and ip not in self.load_metrics.last_heartbeat_time_by_ip:
  1128. self.load_metrics.mark_active(ip)
  1129. # Heartbeat indicates node is healthy:
  1130. if self.heartbeat_on_time(node_id, now):
  1131. continue
  1132. self.schedule_node_termination(
  1133. node_id, "lost contact with raylet", logger.warning
  1134. )
  1135. self.terminate_scheduled_nodes()
  1136. def attempt_to_recover_unhealthy_nodes(self, now):
  1137. for node_id in self.non_terminated_nodes.worker_ids:
  1138. self.recover_if_needed(node_id, now)
  1139. def recover_if_needed(self, node_id, now):
  1140. if not self.can_update(node_id):
  1141. return
  1142. if self.heartbeat_on_time(node_id, now):
  1143. return
  1144. logger.warning(
  1145. "StandardAutoscaler: "
  1146. "{}: No recent heartbeat, "
  1147. "restarting Ray to recover...".format(node_id)
  1148. )
  1149. self.event_summarizer.add(
  1150. "Restarting {} nodes of type "
  1151. + self._get_node_type(node_id)
  1152. + " (lost contact with raylet).",
  1153. quantity=1,
  1154. aggregate=operator.add,
  1155. )
  1156. head_node_ip = self.provider.internal_ip(self.non_terminated_nodes.head_id)
  1157. updater = NodeUpdaterThread(
  1158. node_id=node_id,
  1159. provider_config=self.config["provider"],
  1160. provider=self.provider,
  1161. auth_config=self.config["auth"],
  1162. cluster_name=self.config["cluster_name"],
  1163. file_mounts={},
  1164. initialization_commands=[],
  1165. setup_commands=[],
  1166. ray_start_commands=with_head_node_ip(
  1167. self.config["worker_start_ray_commands"], head_node_ip
  1168. ),
  1169. runtime_hash=self.runtime_hash,
  1170. file_mounts_contents_hash=self.file_mounts_contents_hash,
  1171. process_runner=self.process_runner,
  1172. use_internal_ip=True,
  1173. is_head_node=False,
  1174. docker_config=self._get_node_specific_docker_config(node_id),
  1175. node_resources=self._node_resources(node_id),
  1176. node_labels=self._node_labels(node_id),
  1177. for_recovery=True,
  1178. )
  1179. updater.start()
  1180. self.updaters[node_id] = updater
  1181. def _get_node_type(self, node_id: str) -> str:
  1182. # For type checking, assert that this object has been instantitiated.
  1183. assert self.provider
  1184. node_tags = self.provider.node_tags(node_id)
  1185. if TAG_RAY_USER_NODE_TYPE in node_tags:
  1186. return node_tags[TAG_RAY_USER_NODE_TYPE]
  1187. else:
  1188. return "unknown_node_type"
  1189. def _get_node_type_specific_fields(self, node_id: str, fields_key: str) -> Any:
  1190. # For type checking, assert that this object has been instantitiated.
  1191. assert self.provider
  1192. fields = self.config[fields_key]
  1193. node_tags = self.provider.node_tags(node_id)
  1194. if TAG_RAY_USER_NODE_TYPE in node_tags:
  1195. node_type = node_tags[TAG_RAY_USER_NODE_TYPE]
  1196. if node_type not in self.available_node_types:
  1197. raise ValueError(f"Unknown node type tag: {node_type}.")
  1198. node_specific_config = self.available_node_types[node_type]
  1199. if fields_key in node_specific_config:
  1200. fields = node_specific_config[fields_key]
  1201. return fields
  1202. def _get_node_specific_docker_config(self, node_id):
  1203. if "docker" not in self.config:
  1204. return {}
  1205. docker_config = copy.deepcopy(self.config.get("docker", {}))
  1206. node_specific_docker = self._get_node_type_specific_fields(node_id, "docker")
  1207. docker_config.update(node_specific_docker)
  1208. return docker_config
  1209. def should_update(self, node_id):
  1210. if not self.can_update(node_id):
  1211. return UpdateInstructions(None, None, None, None) # no update
  1212. status = self.provider.node_tags(node_id).get(TAG_RAY_NODE_STATUS)
  1213. if status == STATUS_UP_TO_DATE and self.files_up_to_date(node_id):
  1214. return UpdateInstructions(None, None, None, None) # no update
  1215. successful_updated = self.num_successful_updates.get(node_id, 0) > 0
  1216. if successful_updated and self.config.get("restart_only", False):
  1217. setup_commands = []
  1218. ray_start_commands = self.config["worker_start_ray_commands"]
  1219. elif successful_updated and self.config.get("no_restart", False):
  1220. setup_commands = self._get_node_type_specific_fields(
  1221. node_id, "worker_setup_commands"
  1222. )
  1223. ray_start_commands = []
  1224. else:
  1225. setup_commands = self._get_node_type_specific_fields(
  1226. node_id, "worker_setup_commands"
  1227. )
  1228. ray_start_commands = self.config["worker_start_ray_commands"]
  1229. docker_config = self._get_node_specific_docker_config(node_id)
  1230. return UpdateInstructions(
  1231. node_id=node_id,
  1232. setup_commands=setup_commands,
  1233. ray_start_commands=ray_start_commands,
  1234. docker_config=docker_config,
  1235. )
  1236. def spawn_updater(
  1237. self,
  1238. node_id,
  1239. setup_commands,
  1240. ray_start_commands,
  1241. node_resources,
  1242. node_labels,
  1243. docker_config,
  1244. ):
  1245. logger.info(
  1246. f"Creating new (spawn_updater) updater thread for node" f" {node_id}."
  1247. )
  1248. ip = self.provider.internal_ip(node_id)
  1249. node_type = self._get_node_type(node_id)
  1250. self.node_tracker.track(node_id, ip, node_type)
  1251. head_node_ip = self.provider.internal_ip(self.non_terminated_nodes.head_id)
  1252. updater = NodeUpdaterThread(
  1253. node_id=node_id,
  1254. provider_config=self.config["provider"],
  1255. provider=self.provider,
  1256. auth_config=self.config["auth"],
  1257. cluster_name=self.config["cluster_name"],
  1258. file_mounts=self.config["file_mounts"],
  1259. initialization_commands=with_head_node_ip(
  1260. self._get_node_type_specific_fields(node_id, "initialization_commands"),
  1261. head_node_ip,
  1262. ),
  1263. setup_commands=with_head_node_ip(setup_commands, head_node_ip),
  1264. ray_start_commands=with_head_node_ip(ray_start_commands, head_node_ip),
  1265. runtime_hash=self.runtime_hash,
  1266. file_mounts_contents_hash=self.file_mounts_contents_hash,
  1267. is_head_node=False,
  1268. cluster_synced_files=self.config["cluster_synced_files"],
  1269. rsync_options={
  1270. "rsync_exclude": self.config.get("rsync_exclude"),
  1271. "rsync_filter": self.config.get("rsync_filter"),
  1272. },
  1273. process_runner=self.process_runner,
  1274. use_internal_ip=True,
  1275. docker_config=docker_config,
  1276. node_resources=node_resources,
  1277. node_labels=node_labels,
  1278. )
  1279. updater.start()
  1280. self.updaters[node_id] = updater
  1281. def can_update(self, node_id):
  1282. if self.disable_node_updaters:
  1283. return False
  1284. if node_id in self.updaters:
  1285. return False
  1286. if not self.launch_config_ok(node_id):
  1287. return False
  1288. if self.num_failed_updates.get(node_id, 0) > 0: # TODO(ekl) retry?
  1289. return False
  1290. logger.debug(
  1291. f"{node_id} is not being updated and "
  1292. "passes config check (can_update=True)."
  1293. )
  1294. return True
  1295. def launch_new_node(self, count: int, node_type: str) -> None:
  1296. logger.info("StandardAutoscaler: Queue {} new nodes for launch".format(count))
  1297. self.pending_launches.inc(node_type, count)
  1298. config = copy.deepcopy(self.config)
  1299. if self.foreground_node_launch:
  1300. assert self.foreground_node_launcher is not None
  1301. # Launch in the main thread and block.
  1302. self.foreground_node_launcher.launch_node(config, count, node_type)
  1303. else:
  1304. assert self.launch_queue is not None
  1305. # Split into individual launch requests of the max batch size.
  1306. while count > 0:
  1307. # Enqueue launch data for the background NodeUpdater threads.
  1308. self.launch_queue.put(
  1309. (config, min(count, self.max_launch_batch), node_type)
  1310. )
  1311. count -= self.max_launch_batch
  1312. def kill_workers(self):
  1313. logger.error("StandardAutoscaler: kill_workers triggered")
  1314. nodes = self.workers()
  1315. if nodes:
  1316. self.provider.terminate_nodes(nodes)
  1317. for node in nodes:
  1318. self.node_tracker.untrack(node)
  1319. self.prom_metrics.stopped_nodes.inc()
  1320. logger.error("StandardAutoscaler: terminated {} node(s)".format(len(nodes)))
  1321. def summary(self) -> Optional[AutoscalerSummary]:
  1322. """Summarizes the active, pending, and failed node launches.
  1323. An active node is a node whose raylet is actively reporting heartbeats.
  1324. A pending node is non-active node whose node tag is uninitialized,
  1325. waiting for ssh, syncing files, or setting up.
  1326. If a node is not pending or active, it is failed.
  1327. Returns:
  1328. AutoscalerSummary: The summary.
  1329. """
  1330. # For type checking, assert that this object has been instantitiated.
  1331. assert self.provider
  1332. if not self.non_terminated_nodes:
  1333. return None
  1334. active_nodes: Dict[NodeType, int] = Counter()
  1335. pending_nodes = []
  1336. failed_nodes = []
  1337. non_failed = set()
  1338. node_type_mapping = {}
  1339. now = time.time()
  1340. for node_id in self.non_terminated_nodes.all_node_ids:
  1341. ip = self.provider.internal_ip(node_id)
  1342. node_tags = self.provider.node_tags(node_id)
  1343. if not all(
  1344. tag in node_tags
  1345. for tag in (
  1346. TAG_RAY_NODE_KIND,
  1347. TAG_RAY_USER_NODE_TYPE,
  1348. TAG_RAY_NODE_STATUS,
  1349. )
  1350. ):
  1351. # In some node providers, creation of a node and tags is not
  1352. # atomic, so just skip it.
  1353. continue
  1354. if node_tags[TAG_RAY_NODE_KIND] == NODE_KIND_UNMANAGED:
  1355. continue
  1356. node_type = node_tags[TAG_RAY_USER_NODE_TYPE]
  1357. node_type_mapping[ip] = node_type
  1358. is_active = self.heartbeat_on_time(node_id, now)
  1359. if is_active:
  1360. active_nodes[node_type] += 1
  1361. non_failed.add(node_id)
  1362. else:
  1363. status = node_tags[TAG_RAY_NODE_STATUS]
  1364. completed_states = [STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED]
  1365. is_pending = status not in completed_states
  1366. if is_pending:
  1367. pending_nodes.append((node_id, ip, node_type, status))
  1368. non_failed.add(node_id)
  1369. failed_nodes = self.node_tracker.get_all_failed_node_info(non_failed)
  1370. # The concurrent counter leaves some 0 counts in, so we need to
  1371. # manually filter those out.
  1372. pending_launches = {}
  1373. for node_type, count in self.pending_launches.breakdown().items():
  1374. if count:
  1375. pending_launches[node_type] = count
  1376. pending_resources = {}
  1377. for node_resources in self.resource_demand_scheduler.calculate_node_resources(
  1378. nodes=[node_id for node_id, _, _, _ in pending_nodes],
  1379. pending_nodes=pending_launches,
  1380. # We don't fill this field out because we're intentionally only
  1381. # passing pending nodes (which aren't tracked by load metrics
  1382. # anyways).
  1383. unused_resources_by_ip={},
  1384. )[0]:
  1385. for key, value in node_resources.items():
  1386. pending_resources[key] = value + pending_resources.get(key, 0)
  1387. return AutoscalerSummary(
  1388. # Convert active_nodes from counter to dict for later serialization
  1389. active_nodes=dict(active_nodes),
  1390. idle_nodes=None,
  1391. pending_nodes=[
  1392. (ip, node_type, status) for _, ip, node_type, status in pending_nodes
  1393. ],
  1394. pending_launches=pending_launches,
  1395. failed_nodes=failed_nodes,
  1396. node_availability_summary=self.node_provider_availability_tracker.summary(),
  1397. pending_resources=pending_resources,
  1398. node_type_mapping=node_type_mapping,
  1399. legacy=True,
  1400. )
  1401. def info_string(self):
  1402. lm_summary = self.load_metrics.summary()
  1403. autoscaler_summary = self.summary()
  1404. assert autoscaler_summary
  1405. return "\n" + format_info_string(lm_summary, autoscaler_summary)