schema.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. import re
  2. from dataclasses import dataclass, field
  3. from enum import Enum
  4. from typing import Dict, List, Optional, Tuple
  5. from ray.autoscaler.v2.instance_manager.common import InstanceUtil
  6. from ray.core.generated.autoscaler_pb2 import NodeState, NodeStatus
  7. from ray.core.generated.instance_manager_pb2 import Instance
  8. # TODO(rickyx): once we have graceful shutdown, we could populate
  9. # the failure detail with the actual termination message. As of now,
  10. # we will use a more generic message to include cases such as:
  11. # (idle termination, node death, crash, preemption, etc)
  12. NODE_DEATH_CAUSE_RAYLET_DIED = "NodeTerminated"
  13. # e.g., cpu_4_ondemand.
  14. NodeType = str
  15. @dataclass
  16. class ResourceUsage:
  17. # Resource name.
  18. resource_name: str = ""
  19. # Total resource.
  20. total: float = 0.0
  21. # Resource used.
  22. used: float = 0.0
  23. @dataclass
  24. class NodeUsage:
  25. # The node resource usage.
  26. usage: List[ResourceUsage]
  27. # How long the node has been idle.
  28. idle_time_ms: int
  29. @dataclass
  30. class NodeInfo:
  31. # The instance type name, e.g. p3.2xlarge
  32. instance_type_name: str
  33. # ray node type name.
  34. ray_node_type_name: str
  35. # Cloud instance id.
  36. instance_id: str
  37. # Ip address of the node when alive.
  38. ip_address: str
  39. # The status of the node. Optional for pending nodes.
  40. node_status: Optional[str] = None
  41. # ray node id in hex. None if still pending.
  42. node_id: Optional[str] = None
  43. # Resource usage breakdown if node is running.
  44. resource_usage: Optional[NodeUsage] = None
  45. # Failure detail if the node failed.
  46. failure_detail: Optional[str] = None
  47. # Descriptive details.
  48. details: Optional[str] = None
  49. # Activity on the node.
  50. node_activity: Optional[List[str]] = None
  51. # Ray node labels.
  52. labels: Optional[Dict[str, str]] = None
  53. def total_resources(self) -> Dict[str, float]:
  54. if self.resource_usage is None:
  55. return {}
  56. return {r.resource_name: r.total for r in self.resource_usage.usage}
  57. def available_resources(self) -> Dict[str, float]:
  58. if self.resource_usage is None:
  59. return {}
  60. return {r.resource_name: r.total - r.used for r in self.resource_usage.usage}
  61. def used_resources(self) -> Dict[str, float]:
  62. if self.resource_usage is None:
  63. return {}
  64. return {r.resource_name: r.used for r in self.resource_usage.usage}
  65. @dataclass
  66. class LaunchRequest:
  67. class Status(Enum):
  68. FAILED = "FAILED"
  69. PENDING = "PENDING"
  70. # The instance type name, e.g. p3.2xlarge
  71. instance_type_name: str
  72. # ray node type name.
  73. ray_node_type_name: str
  74. # count.
  75. count: int
  76. # State: (e.g. PENDING, FAILED)
  77. state: Status
  78. # When the launch request was made in unix timestamp in secs.
  79. request_ts_s: int
  80. # When the launch request failed unix timestamp in secs if failed.
  81. failed_ts_s: Optional[int] = None
  82. # Request details, e.g. error reason if the launch request failed.
  83. details: Optional[str] = None
  84. @dataclass
  85. class ResourceRequestByCount:
  86. # Bundles in the demand.
  87. bundle: Dict[str, float]
  88. # Number of bundles with the same shape.
  89. count: int
  90. def __str__(self) -> str:
  91. return f"[{self.count} {self.bundle}]"
  92. @dataclass
  93. class ResourceDemand:
  94. # The bundles in the demand with shape and count info.
  95. bundles_by_count: List[ResourceRequestByCount]
  96. @dataclass
  97. class PlacementGroupResourceDemand(ResourceDemand):
  98. # Details string (parsed into below information)
  99. details: str
  100. # Placement group's id.
  101. pg_id: Optional[str] = None
  102. # Strategy, e.g. STRICT_SPREAD
  103. strategy: Optional[str] = None
  104. # Placement group's state, e.g. PENDING
  105. state: Optional[str] = None
  106. def __post_init__(self):
  107. if not self.details:
  108. return
  109. # Details in the format of <pg_id>:<strategy>|<state>, parse
  110. # it into the above fields.
  111. pattern = r"^.*:.*\|.*$"
  112. match = re.match(pattern, self.details)
  113. if not match:
  114. return
  115. pg_id, details = self.details.split(":")
  116. strategy, state = details.split("|")
  117. self.pg_id = pg_id
  118. self.strategy = strategy
  119. self.state = state
  120. @dataclass
  121. class RayTaskActorDemand(ResourceDemand):
  122. pass
  123. @dataclass
  124. class ClusterConstraintDemand(ResourceDemand):
  125. pass
  126. @dataclass
  127. class ResourceDemandSummary:
  128. # Placement group demand.
  129. placement_group_demand: List[PlacementGroupResourceDemand] = field(
  130. default_factory=list
  131. )
  132. # Ray task actor demand.
  133. ray_task_actor_demand: List[RayTaskActorDemand] = field(default_factory=list)
  134. # Cluster constraint demand.
  135. cluster_constraint_demand: List[ClusterConstraintDemand] = field(
  136. default_factory=list
  137. )
  138. @dataclass
  139. class Stats:
  140. # How long it took to get the GCS request.
  141. # This is required when initializing the Stats since it should be calculated before
  142. # the request was made.
  143. gcs_request_time_s: float
  144. # How long it took to get all live instances from node provider.
  145. none_terminated_node_request_time_s: Optional[float] = None
  146. # How long for autoscaler to process the scaling decision.
  147. autoscaler_iteration_time_s: Optional[float] = None
  148. # The last seen autoscaler state version from Ray.
  149. autoscaler_version: Optional[str] = None
  150. # The last seen cluster state resource version.
  151. cluster_resource_state_version: Optional[str] = None
  152. # Request made time unix timestamp: when the data was pulled from GCS.
  153. request_ts_s: Optional[int] = None
  154. @dataclass
  155. class ClusterStatus:
  156. # Healthy nodes information (non-idle)
  157. active_nodes: List[NodeInfo] = field(default_factory=list)
  158. # Idle node information
  159. idle_nodes: List[NodeInfo] = field(default_factory=list)
  160. # Pending launches.
  161. pending_launches: List[LaunchRequest] = field(default_factory=list)
  162. # Failed launches.
  163. failed_launches: List[LaunchRequest] = field(default_factory=list)
  164. # Pending nodes.
  165. pending_nodes: List[NodeInfo] = field(default_factory=list)
  166. # Failures
  167. failed_nodes: List[NodeInfo] = field(default_factory=list)
  168. # Resource usage summary for entire cluster.
  169. cluster_resource_usage: List[ResourceUsage] = field(default_factory=list)
  170. # Demand summary.
  171. resource_demands: ResourceDemandSummary = field(
  172. default_factory=ResourceDemandSummary
  173. )
  174. # Query metics
  175. stats: Stats = field(default_factory=Stats)
  176. def total_resources(self) -> Dict[str, float]:
  177. return {r.resource_name: r.total for r in self.cluster_resource_usage}
  178. def available_resources(self) -> Dict[str, float]:
  179. return {r.resource_name: r.total - r.used for r in self.cluster_resource_usage}
  180. # TODO(rickyx): we don't show infeasible requests as of now.
  181. # (They will just be pending forever as part of the demands)
  182. # We should show them properly in the future.
  183. @dataclass
  184. class AutoscalerInstance:
  185. """
  186. AutoscalerInstance represents an instance that's managed by the autoscaler.
  187. This includes two states:
  188. 1. the instance manager state: information of the underlying cloud instance.
  189. 2. the ray node state, e.g. resources, ray node status.
  190. The two states are linked by the cloud instance id, which should be set
  191. when the ray node is started.
  192. """
  193. # The cloud instance id. It could be None if the instance hasn't been assigned
  194. # a cloud instance id, e.g. the instance is still in QUEUED or REQUESTED status.
  195. cloud_instance_id: Optional[str] = None
  196. # The ray node state status. It could be None when no ray node is running
  197. # or has run on the cloud instance: for example, ray is still being installed
  198. # or the instance manager hasn't had a cloud instance assigned (e.g. QUEUED,
  199. # REQUESTED).
  200. ray_node: Optional[NodeState] = None
  201. # The instance manager instance state. It would be None when the ray_node is not
  202. # None.
  203. # It could be None iff:
  204. # 1. There's a ray node, but the instance manager hasn't discovered the
  205. # cloud instance that's running this ray process yet. This could happen since
  206. # the instance manager only discovers instances periodically.
  207. #
  208. # 2. There was a ray node running on the cloud instance, which was already stopped
  209. # and removed from the instance manager state. But the ray state is still lagging
  210. # behind.
  211. #
  212. # 3. There is a ray node that's unmanaged by the instance manager.
  213. #
  214. im_instance: Optional[Instance] = None
  215. # | cloud_instance_id | ray_node | im_instance |
  216. # |-------------------|----------|-------------|
  217. # | None | None | None | Not possible.
  218. # | None | None | not None | OK. An instance hasn't had ray running on it yet. # noqa E501
  219. # | None | Not None | None | OK. Possible if the ray node is not started by autoscaler. # noqa E501
  220. # | None | Not None | not None | Not possible - no way to link im instance with ray node. # noqa E501
  221. # | not None | None | None | Not possible since cloud instance id is either part of im state or ray node. # noqa E501
  222. # | not None | None | not None | OK. e.g. An instance that's not running ray yet. # noqa E501
  223. # | not None | Not None | None | OK. See scenario 1, 2, 3 above.
  224. # | not None | Not None | not None | OK. An instance that's running ray.
  225. def validate(self) -> Tuple[bool, str]:
  226. """Validate the autoscaler instance state.
  227. Returns:
  228. A tuple of (valid, error_msg) where:
  229. - valid is whether the state is valid
  230. - error_msg is the error message for the validation results.
  231. """
  232. state_combinations = {
  233. # (cloud_instance_id is None, ray_node is None, im_instance is None): (valid, error_msg) # noqa E501
  234. (True, True, True): (False, "Not possible"),
  235. (True, True, False): (True, ""),
  236. (True, False, True): (
  237. True,
  238. "There's a ray node w/o cloud instance id, must be started not "
  239. "by autoscaler",
  240. ),
  241. (True, False, False): (
  242. False,
  243. "Not possible - no way to link im instance with ray node",
  244. ),
  245. (False, True, True): (
  246. False,
  247. "Not possible since cloud instance id is either part of "
  248. "im state or ray node",
  249. ),
  250. (False, True, False): (True, ""),
  251. (False, False, True): (True, ""),
  252. (False, False, False): (True, ""),
  253. }
  254. valid, error_msg = state_combinations[
  255. (
  256. self.cloud_instance_id is None,
  257. self.ray_node is None,
  258. self.im_instance is None,
  259. )
  260. ]
  261. if not valid:
  262. return valid, error_msg
  263. if self.im_instance is not None and self.ray_node is None:
  264. # We don't see a ray node, but tracking an im instance.
  265. if self.cloud_instance_id is None:
  266. if InstanceUtil.is_cloud_instance_allocated(self.im_instance.status):
  267. return (
  268. False,
  269. "instance should be in a status where cloud instance "
  270. "is not allocated.",
  271. )
  272. else:
  273. if not InstanceUtil.is_cloud_instance_allocated(
  274. self.im_instance.status
  275. ):
  276. return (
  277. False,
  278. "instance should be in a status where cloud instance is "
  279. "allocated.",
  280. )
  281. if self.ray_node is not None:
  282. if self.cloud_instance_id != self.ray_node.instance_id:
  283. return False, "cloud instance id doesn't match."
  284. if self.im_instance is not None and self.cloud_instance_id is not None:
  285. if self.cloud_instance_id != self.im_instance.cloud_instance_id:
  286. return False, "cloud instance id doesn't match."
  287. return True, ""
  288. def is_ray_running(self) -> bool:
  289. """Whether the ray node is running."""
  290. return self.ray_node is not None and self.ray_node.status in [
  291. NodeStatus.RUNNING,
  292. NodeStatus.IDLE,
  293. ]
  294. def is_ray_stop(self) -> bool:
  295. """Whether the ray node is stopped."""
  296. return self.ray_node is None or self.ray_node.status in [
  297. NodeStatus.DEAD,
  298. ]