| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- import copy
- import logging
- import operator
- import threading
- import time
- import traceback
- from typing import Any, Dict, Optional
- from ray.autoscaler._private.node_provider_availability_tracker import (
- NodeProviderAvailabilityTracker,
- )
- from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
- from ray.autoscaler._private.util import hash_launch_conf
- from ray.autoscaler.node_launch_exception import NodeLaunchException
- from ray.autoscaler.tags import (
- NODE_KIND_WORKER,
- STATUS_UNINITIALIZED,
- TAG_RAY_LAUNCH_CONFIG,
- TAG_RAY_NODE_KIND,
- TAG_RAY_NODE_NAME,
- TAG_RAY_NODE_STATUS,
- TAG_RAY_USER_NODE_TYPE,
- )
- logger = logging.getLogger(__name__)
- class BaseNodeLauncher:
- """Launches Ray nodes in the main thread using
- `BaseNodeLauncher.launch_node()`.
- This is a superclass of NodeLauncher, which launches nodes asynchronously
- in the background.
- By default, the subclass NodeLauncher is used to launch nodes in subthreads.
- That behavior can be flagged off in the provider config by setting
- `foreground_node_launch: True`; the autoscaler will then makes blocking calls to
- BaseNodeLauncher.launch_node() in the main thread.
- """
- def __init__(
- self,
- provider,
- pending,
- event_summarizer,
- node_provider_availability_tracker: NodeProviderAvailabilityTracker,
- session_name: Optional[str] = None,
- prom_metrics=None,
- node_types=None,
- index=None,
- *args,
- **kwargs,
- ):
- self.pending = pending
- self.event_summarizer = event_summarizer
- self.node_provider_availability_tracker = node_provider_availability_tracker
- self.prom_metrics = prom_metrics or AutoscalerPrometheusMetrics(
- session_name=session_name
- )
- self.provider = provider
- self.node_types = node_types
- self.index = str(index) if index is not None else ""
- def launch_node(
- self, config: Dict[str, Any], count: int, node_type: str
- ) -> Optional[Dict]:
- self.log("Got {} nodes to launch.".format(count))
- created_nodes = self._launch_node(config, count, node_type)
- self.pending.dec(node_type, count)
- return created_nodes
- def _launch_node(
- self, config: Dict[str, Any], count: int, node_type: str
- ) -> Optional[Dict]:
- if self.node_types:
- assert node_type, node_type
- # The `worker_nodes` field is deprecated in favor of per-node-type
- # node_configs. We allow it for backwards-compatibility.
- launch_config = copy.deepcopy(config.get("worker_nodes", {}))
- if node_type:
- launch_config.update(
- config["available_node_types"][node_type]["node_config"]
- )
- resources = copy.deepcopy(
- config["available_node_types"][node_type]["resources"]
- )
- labels = copy.deepcopy(
- config["available_node_types"][node_type].get("labels", {})
- )
- launch_hash = hash_launch_conf(launch_config, config["auth"])
- node_config = copy.deepcopy(config.get("worker_nodes", {}))
- node_tags = {
- TAG_RAY_NODE_NAME: "ray-{}-worker".format(config["cluster_name"]),
- TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
- TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED,
- TAG_RAY_LAUNCH_CONFIG: launch_hash,
- }
- # A custom node type is specified; set the tag in this case, and also
- # merge the configs. We merge the configs instead of overriding, so
- # that the bootstrapped per-cloud properties are preserved.
- # TODO(ekl) this logic is duplicated in commands.py (keep in sync)
- if node_type:
- node_tags[TAG_RAY_USER_NODE_TYPE] = node_type
- node_config.update(launch_config)
- node_launch_start_time = time.time()
- error_msg = None
- full_exception = None
- created_nodes = {}
- try:
- created_nodes = self.provider.create_node_with_resources_and_labels(
- node_config, node_tags, count, resources, labels
- )
- except NodeLaunchException as node_launch_exception:
- self.node_provider_availability_tracker.update_node_availability(
- node_type, int(node_launch_start_time), node_launch_exception
- )
- if node_launch_exception.src_exc_info is not None:
- full_exception = "\n".join(
- traceback.format_exception(*node_launch_exception.src_exc_info)
- )
- error_msg = (
- f"Failed to launch {{}} node(s) of type {node_type}. "
- f"({node_launch_exception.category}): "
- f"{node_launch_exception.description}"
- )
- except Exception:
- error_msg = f"Failed to launch {{}} node(s) of type {node_type}."
- full_exception = traceback.format_exc()
- else:
- # Record some metrics/observability information when a node is launched.
- launch_time = time.time() - node_launch_start_time
- for _ in range(count):
- # Note: when launching multiple nodes we observe the time it
- # took all nodes to launch for each node. For example, if 4
- # nodes were created in 25 seconds, we would observe the 25
- # second create time 4 times.
- self.prom_metrics.worker_create_node_time.observe(launch_time)
- self.prom_metrics.started_nodes.inc(count)
- self.node_provider_availability_tracker.update_node_availability(
- node_type=node_type,
- timestamp=int(node_launch_start_time),
- node_launch_exception=None,
- )
- if error_msg is not None:
- self.event_summarizer.add(
- error_msg,
- quantity=count,
- aggregate=operator.add,
- )
- self.log(error_msg)
- self.prom_metrics.node_launch_exceptions.inc()
- self.prom_metrics.failed_create_nodes.inc(count)
- else:
- self.log("Launching {} nodes, type {}.".format(count, node_type))
- self.event_summarizer.add(
- "Adding {} node(s) of type " + str(node_type) + ".",
- quantity=count,
- aggregate=operator.add,
- )
- if full_exception is not None:
- self.log(full_exception)
- return created_nodes
- def log(self, statement):
- # launcher_class is "BaseNodeLauncher", or "NodeLauncher" if called
- # from that subclass.
- launcher_class: str = type(self).__name__
- prefix = "{}{}:".format(launcher_class, self.index)
- logger.info(prefix + " {}".format(statement))
- class NodeLauncher(BaseNodeLauncher, threading.Thread):
- """Launches nodes asynchronously in the background."""
- def __init__(
- self,
- provider,
- queue,
- pending,
- event_summarizer,
- node_provider_availability_tracker,
- session_name: Optional[str] = None,
- prom_metrics=None,
- node_types=None,
- index=None,
- *thread_args,
- **thread_kwargs,
- ):
- self.queue = queue
- BaseNodeLauncher.__init__(
- self,
- provider=provider,
- pending=pending,
- event_summarizer=event_summarizer,
- session_name=session_name,
- node_provider_availability_tracker=node_provider_availability_tracker,
- prom_metrics=prom_metrics,
- node_types=node_types,
- index=index,
- )
- threading.Thread.__init__(self, *thread_args, **thread_kwargs)
- def run(self):
- """Collects launch data from queue populated by StandardAutoscaler.
- Launches nodes in a background thread.
- Overrides threading.Thread.run().
- NodeLauncher.start() executes this loop in a background thread.
- """
- while True:
- config, count, node_type = self.queue.get()
- # launch_node is implemented in BaseNodeLauncher
- self.launch_node(config, count, node_type)
|