node_launcher.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import copy
  2. import logging
  3. import operator
  4. import threading
  5. import time
  6. import traceback
  7. from typing import Any, Dict, Optional
  8. from ray.autoscaler._private.node_provider_availability_tracker import (
  9. NodeProviderAvailabilityTracker,
  10. )
  11. from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
  12. from ray.autoscaler._private.util import hash_launch_conf
  13. from ray.autoscaler.node_launch_exception import NodeLaunchException
  14. from ray.autoscaler.tags import (
  15. NODE_KIND_WORKER,
  16. STATUS_UNINITIALIZED,
  17. TAG_RAY_LAUNCH_CONFIG,
  18. TAG_RAY_NODE_KIND,
  19. TAG_RAY_NODE_NAME,
  20. TAG_RAY_NODE_STATUS,
  21. TAG_RAY_USER_NODE_TYPE,
  22. )
  23. logger = logging.getLogger(__name__)
  24. class BaseNodeLauncher:
  25. """Launches Ray nodes in the main thread using
  26. `BaseNodeLauncher.launch_node()`.
  27. This is a superclass of NodeLauncher, which launches nodes asynchronously
  28. in the background.
  29. By default, the subclass NodeLauncher is used to launch nodes in subthreads.
  30. That behavior can be flagged off in the provider config by setting
  31. `foreground_node_launch: True`; the autoscaler will then makes blocking calls to
  32. BaseNodeLauncher.launch_node() in the main thread.
  33. """
  34. def __init__(
  35. self,
  36. provider,
  37. pending,
  38. event_summarizer,
  39. node_provider_availability_tracker: NodeProviderAvailabilityTracker,
  40. session_name: Optional[str] = None,
  41. prom_metrics=None,
  42. node_types=None,
  43. index=None,
  44. *args,
  45. **kwargs,
  46. ):
  47. self.pending = pending
  48. self.event_summarizer = event_summarizer
  49. self.node_provider_availability_tracker = node_provider_availability_tracker
  50. self.prom_metrics = prom_metrics or AutoscalerPrometheusMetrics(
  51. session_name=session_name
  52. )
  53. self.provider = provider
  54. self.node_types = node_types
  55. self.index = str(index) if index is not None else ""
  56. def launch_node(
  57. self, config: Dict[str, Any], count: int, node_type: str
  58. ) -> Optional[Dict]:
  59. self.log("Got {} nodes to launch.".format(count))
  60. created_nodes = self._launch_node(config, count, node_type)
  61. self.pending.dec(node_type, count)
  62. return created_nodes
  63. def _launch_node(
  64. self, config: Dict[str, Any], count: int, node_type: str
  65. ) -> Optional[Dict]:
  66. if self.node_types:
  67. assert node_type, node_type
  68. # The `worker_nodes` field is deprecated in favor of per-node-type
  69. # node_configs. We allow it for backwards-compatibility.
  70. launch_config = copy.deepcopy(config.get("worker_nodes", {}))
  71. if node_type:
  72. launch_config.update(
  73. config["available_node_types"][node_type]["node_config"]
  74. )
  75. resources = copy.deepcopy(
  76. config["available_node_types"][node_type]["resources"]
  77. )
  78. labels = copy.deepcopy(
  79. config["available_node_types"][node_type].get("labels", {})
  80. )
  81. launch_hash = hash_launch_conf(launch_config, config["auth"])
  82. node_config = copy.deepcopy(config.get("worker_nodes", {}))
  83. node_tags = {
  84. TAG_RAY_NODE_NAME: "ray-{}-worker".format(config["cluster_name"]),
  85. TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
  86. TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED,
  87. TAG_RAY_LAUNCH_CONFIG: launch_hash,
  88. }
  89. # A custom node type is specified; set the tag in this case, and also
  90. # merge the configs. We merge the configs instead of overriding, so
  91. # that the bootstrapped per-cloud properties are preserved.
  92. # TODO(ekl) this logic is duplicated in commands.py (keep in sync)
  93. if node_type:
  94. node_tags[TAG_RAY_USER_NODE_TYPE] = node_type
  95. node_config.update(launch_config)
  96. node_launch_start_time = time.time()
  97. error_msg = None
  98. full_exception = None
  99. created_nodes = {}
  100. try:
  101. created_nodes = self.provider.create_node_with_resources_and_labels(
  102. node_config, node_tags, count, resources, labels
  103. )
  104. except NodeLaunchException as node_launch_exception:
  105. self.node_provider_availability_tracker.update_node_availability(
  106. node_type, int(node_launch_start_time), node_launch_exception
  107. )
  108. if node_launch_exception.src_exc_info is not None:
  109. full_exception = "\n".join(
  110. traceback.format_exception(*node_launch_exception.src_exc_info)
  111. )
  112. error_msg = (
  113. f"Failed to launch {{}} node(s) of type {node_type}. "
  114. f"({node_launch_exception.category}): "
  115. f"{node_launch_exception.description}"
  116. )
  117. except Exception:
  118. error_msg = f"Failed to launch {{}} node(s) of type {node_type}."
  119. full_exception = traceback.format_exc()
  120. else:
  121. # Record some metrics/observability information when a node is launched.
  122. launch_time = time.time() - node_launch_start_time
  123. for _ in range(count):
  124. # Note: when launching multiple nodes we observe the time it
  125. # took all nodes to launch for each node. For example, if 4
  126. # nodes were created in 25 seconds, we would observe the 25
  127. # second create time 4 times.
  128. self.prom_metrics.worker_create_node_time.observe(launch_time)
  129. self.prom_metrics.started_nodes.inc(count)
  130. self.node_provider_availability_tracker.update_node_availability(
  131. node_type=node_type,
  132. timestamp=int(node_launch_start_time),
  133. node_launch_exception=None,
  134. )
  135. if error_msg is not None:
  136. self.event_summarizer.add(
  137. error_msg,
  138. quantity=count,
  139. aggregate=operator.add,
  140. )
  141. self.log(error_msg)
  142. self.prom_metrics.node_launch_exceptions.inc()
  143. self.prom_metrics.failed_create_nodes.inc(count)
  144. else:
  145. self.log("Launching {} nodes, type {}.".format(count, node_type))
  146. self.event_summarizer.add(
  147. "Adding {} node(s) of type " + str(node_type) + ".",
  148. quantity=count,
  149. aggregate=operator.add,
  150. )
  151. if full_exception is not None:
  152. self.log(full_exception)
  153. return created_nodes
  154. def log(self, statement):
  155. # launcher_class is "BaseNodeLauncher", or "NodeLauncher" if called
  156. # from that subclass.
  157. launcher_class: str = type(self).__name__
  158. prefix = "{}{}:".format(launcher_class, self.index)
  159. logger.info(prefix + " {}".format(statement))
  160. class NodeLauncher(BaseNodeLauncher, threading.Thread):
  161. """Launches nodes asynchronously in the background."""
  162. def __init__(
  163. self,
  164. provider,
  165. queue,
  166. pending,
  167. event_summarizer,
  168. node_provider_availability_tracker,
  169. session_name: Optional[str] = None,
  170. prom_metrics=None,
  171. node_types=None,
  172. index=None,
  173. *thread_args,
  174. **thread_kwargs,
  175. ):
  176. self.queue = queue
  177. BaseNodeLauncher.__init__(
  178. self,
  179. provider=provider,
  180. pending=pending,
  181. event_summarizer=event_summarizer,
  182. session_name=session_name,
  183. node_provider_availability_tracker=node_provider_availability_tracker,
  184. prom_metrics=prom_metrics,
  185. node_types=node_types,
  186. index=index,
  187. )
  188. threading.Thread.__init__(self, *thread_args, **thread_kwargs)
  189. def run(self):
  190. """Collects launch data from queue populated by StandardAutoscaler.
  191. Launches nodes in a background thread.
  192. Overrides threading.Thread.run().
  193. NodeLauncher.start() executes this loop in a background thread.
  194. """
  195. while True:
  196. config, count, node_type = self.queue.get()
  197. # launch_node is implemented in BaseNodeLauncher
  198. self.launch_node(config, count, node_type)