event_logger.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. import logging
  2. from collections import defaultdict
  3. from typing import Dict, List, Optional
  4. from ray._private.event.event_logger import EventLoggerAdapter
  5. from ray.autoscaler.v2.utils import ResourceRequestUtil
  6. from ray.core.generated.autoscaler_pb2 import (
  7. ClusterResourceConstraint,
  8. GangResourceRequest,
  9. ResourceRequest,
  10. )
  11. from ray.core.generated.common_pb2 import LabelSelectorOperator
  12. from ray.core.generated.instance_manager_pb2 import LaunchRequest, TerminationRequest
  13. logger = logging.getLogger(__name__)
  14. class AutoscalerEventLogger:
  15. """
  16. Logs events related to the autoscaler.
  17. # TODO:
  18. - Add more logging for other events.
  19. - Rate limit the events if too spammy.
  20. """
  21. def __init__(self, logger: EventLoggerAdapter):
  22. self._logger = logger
  23. def log_cluster_scheduling_update(
  24. self,
  25. cluster_resources: Dict[str, float],
  26. launch_requests: Optional[List[LaunchRequest]] = None,
  27. terminate_requests: Optional[List[TerminationRequest]] = None,
  28. infeasible_requests: Optional[List[ResourceRequest]] = None,
  29. infeasible_gang_requests: Optional[List[GangResourceRequest]] = None,
  30. infeasible_cluster_resource_constraints: Optional[
  31. List[ClusterResourceConstraint]
  32. ] = None,
  33. ) -> None:
  34. """
  35. Log updates to the autoscaler scheduling state.
  36. Emits:
  37. - info logs for node launches and terminations (counts grouped by node type).
  38. - an info log summarizing the cluster size after a resize (CPUs/GPUs/TPUs).
  39. - warnings describing infeasible single resource requests, infeasible gang
  40. (placement group) requests, and infeasible cluster resource constraints.
  41. Args:
  42. cluster_resources: Mapping of resource name to total resources for the
  43. current cluster state.
  44. launch_requests: Node launch requests issued in this scheduling step.
  45. terminate_requests: Node termination requests issued in this scheduling
  46. step.
  47. infeasible_requests: Resource requests that could not be satisfied by
  48. any available node type.
  49. infeasible_gang_requests: Gang/placement group requests that could not
  50. be scheduled.
  51. infeasible_cluster_resource_constraints: Cluster-level resource
  52. constraints that could not be satisfied.
  53. Returns:
  54. None
  55. """
  56. # Log any launch events.
  57. if launch_requests:
  58. launch_type_count = defaultdict(int)
  59. for req in launch_requests:
  60. launch_type_count[req.instance_type] += req.count
  61. for idx, (instance_type, count) in enumerate(launch_type_count.items()):
  62. log_str = f"Adding {count} node(s) of type {instance_type}."
  63. self._logger.info(f"{log_str}")
  64. logger.info(f"{log_str}")
  65. # Log any terminate events.
  66. if terminate_requests:
  67. termination_by_causes_and_type = defaultdict(int)
  68. for req in terminate_requests:
  69. termination_by_causes_and_type[(req.cause, req.instance_type)] += 1
  70. cause_reason_map = {
  71. TerminationRequest.Cause.OUTDATED: "outdated",
  72. TerminationRequest.Cause.MAX_NUM_NODES: "max number of worker nodes reached", # noqa
  73. TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE: "max number of worker nodes per type reached", # noqa
  74. TerminationRequest.Cause.IDLE: "idle",
  75. }
  76. for idx, ((cause, instance_type), count) in enumerate(
  77. termination_by_causes_and_type.items()
  78. ):
  79. log_str = f"Removing {count} nodes of type {instance_type} ({cause_reason_map[cause]})." # noqa
  80. self._logger.info(f"{log_str}")
  81. logger.info(f"{log_str}")
  82. # Cluster shape changes.
  83. if launch_requests or terminate_requests:
  84. num_cpus = cluster_resources.get("CPU", 0)
  85. log_str = f"Resized to {int(num_cpus)} CPUs"
  86. if "GPU" in cluster_resources:
  87. log_str += f", {int(cluster_resources['GPU'])} GPUs"
  88. if "TPU" in cluster_resources:
  89. log_str += f", {int(cluster_resources['TPU'])} TPUs"
  90. self._logger.info(f"{log_str}.")
  91. self._logger.debug(f"Current cluster resources: {dict(cluster_resources)}.")
  92. # Log any infeasible requests.
  93. if infeasible_requests:
  94. requests_by_count = ResourceRequestUtil.group_by_count(infeasible_requests)
  95. log_str = "No available node types can fulfill resource requests "
  96. for idx, req_count in enumerate(requests_by_count):
  97. resource_map = ResourceRequestUtil.to_resource_map(req_count.request)
  98. log_str += f"{resource_map}*{req_count.count}"
  99. if idx < len(requests_by_count) - 1:
  100. log_str += ", "
  101. # Parse and log label selectors if present
  102. if req_count.request.label_selectors:
  103. selector_strs = []
  104. for selector in req_count.request.label_selectors:
  105. for constraint in selector.label_constraints:
  106. op = LabelSelectorOperator.Name(constraint.operator)
  107. values = ",".join(constraint.label_values)
  108. selector_strs.append(
  109. f"{constraint.label_key} {op} [{values}]"
  110. )
  111. if selector_strs:
  112. log_str += (
  113. " with label selectors: [" + "; ".join(selector_strs) + "]"
  114. )
  115. log_str += (
  116. ". Add suitable node types to this cluster to resolve this issue."
  117. )
  118. self._logger.warning(log_str)
  119. if infeasible_gang_requests:
  120. # Log for each placement group requests.
  121. for gang_request in infeasible_gang_requests:
  122. log_str = (
  123. "No available node types can fulfill "
  124. "placement group requests (detail={details}): ".format(
  125. details=gang_request.details
  126. )
  127. )
  128. requests_by_count = ResourceRequestUtil.group_by_count(
  129. gang_request.requests
  130. )
  131. for idx, req_count in enumerate(requests_by_count):
  132. resource_map = ResourceRequestUtil.to_resource_map(
  133. req_count.request
  134. )
  135. log_str += f"{resource_map}*{req_count.count}"
  136. if idx < len(requests_by_count) - 1:
  137. log_str += ", "
  138. log_str += (
  139. ". Add suitable node types to this cluster to resolve this issue."
  140. )
  141. self._logger.warning(log_str)
  142. if infeasible_cluster_resource_constraints:
  143. # We will only have max 1 cluster resource constraint for now since it's
  144. # from `request_resources()` sdk, where the most recent call would override
  145. # the previous one.
  146. for infeasible_constraint in infeasible_cluster_resource_constraints:
  147. log_str = "No available node types can fulfill cluster constraint: "
  148. for i, requests_by_count in enumerate(
  149. infeasible_constraint.resource_requests
  150. ):
  151. resource_map = ResourceRequestUtil.to_resource_map(
  152. requests_by_count.request
  153. )
  154. log_str += f"{resource_map}*{requests_by_count.count}"
  155. if i < len(infeasible_constraint.resource_requests) - 1:
  156. log_str += ", "
  157. log_str += (
  158. ". Add suitable node types to this cluster to resolve this issue."
  159. )
  160. self._logger.warning(log_str)