datacenter.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import logging
  2. from typing import List, Optional
  3. import ray.dashboard.consts as dashboard_consts
  4. from ray._common.utils import (
  5. get_or_create_event_loop,
  6. )
  7. from ray._private.utils import (
  8. parse_pg_formatted_resources_to_original,
  9. )
  10. from ray.dashboard.utils import (
  11. async_loop_forever,
  12. compose_state_message,
  13. )
  14. logger = logging.getLogger(__name__)
  15. class DataSource:
  16. # {node id hex(str): node stats(dict of GetNodeStatsReply
  17. # in node_manager.proto)}
  18. node_stats = {}
  19. # {node id hex(str): node physical stats(dict from reporter_agent.py)}
  20. node_physical_stats = {}
  21. # {actor id hex(str): actor table data(dict of ActorTableData
  22. # in gcs.proto)}
  23. actors = {}
  24. # {node id hex(str): gcs node info(dict of GcsNodeInfo in gcs.proto)}
  25. nodes = {}
  26. # {node id hex(str): worker list}
  27. node_workers = {}
  28. # {node id hex(str): {actor id hex(str): actor table data}}
  29. node_actors = {}
  30. # {worker id(str): core worker stats}
  31. core_worker_stats = {}
  32. class DataOrganizer:
  33. @staticmethod
  34. @async_loop_forever(dashboard_consts.RAY_DASHBOARD_STATS_PURGING_INTERVAL)
  35. async def purge():
  36. # Purge data that is out of date.
  37. # These data sources are maintained by DashboardHead,
  38. # we do not needs to purge them:
  39. # * agents
  40. # * nodes
  41. alive_nodes = {
  42. node_id
  43. for node_id, node_info in DataSource.nodes.items()
  44. if node_info["state"] == "ALIVE"
  45. }
  46. for key in DataSource.node_stats.keys() - alive_nodes:
  47. DataSource.node_stats.pop(key)
  48. for key in DataSource.node_physical_stats.keys() - alive_nodes:
  49. DataSource.node_physical_stats.pop(key)
  50. @classmethod
  51. @async_loop_forever(dashboard_consts.RAY_DASHBOARD_STATS_UPDATING_INTERVAL)
  52. async def organize(cls, thread_pool_executor):
  53. """
  54. Organizes data: read from (node_physical_stats, node_stats) and updates
  55. (node_workers, node_worker_stats).
  56. This methods is not really async, but DataSource is not thread safe so we need
  57. to make sure it's on the main event loop thread. To avoid blocking the main
  58. event loop, we yield after each node processed.
  59. """
  60. loop = get_or_create_event_loop()
  61. node_workers = {}
  62. core_worker_stats = {}
  63. # NOTE: We copy keys of the `DataSource.nodes` to make sure
  64. # it doesn't change during the iteration (since its being updated
  65. # from another async task)
  66. for node_id in list(DataSource.nodes.keys()):
  67. node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
  68. node_stats = DataSource.node_stats.get(node_id, {})
  69. # Offloads the blocking operation to a thread pool executor. This also
  70. # yields to the event loop.
  71. workers = await loop.run_in_executor(
  72. thread_pool_executor,
  73. cls._extract_workers_for_node,
  74. node_physical_stats,
  75. node_stats,
  76. )
  77. for worker in workers:
  78. for stats in worker.get("coreWorkerStats", []):
  79. worker_id = stats["workerId"]
  80. core_worker_stats[worker_id] = stats
  81. node_workers[node_id] = workers
  82. DataSource.node_workers = node_workers
  83. DataSource.core_worker_stats = core_worker_stats
  84. @classmethod
  85. def _extract_workers_for_node(cls, node_physical_stats, node_stats):
  86. workers = []
  87. # Merge coreWorkerStats (node stats) to workers (node physical stats)
  88. pid_to_worker_stats = {}
  89. pid_to_language = {}
  90. pid_to_job_id = {}
  91. for core_worker_stats in node_stats.get("coreWorkersStats", []):
  92. pid = core_worker_stats["pid"]
  93. pid_to_worker_stats[pid] = core_worker_stats
  94. pid_to_language[pid] = core_worker_stats["language"]
  95. pid_to_job_id[pid] = core_worker_stats["jobId"]
  96. for worker in node_physical_stats.get("workers", []):
  97. worker = dict(worker)
  98. pid = worker["pid"]
  99. core_worker_stats = pid_to_worker_stats.get(pid)
  100. # Empty list means core worker stats is not available.
  101. worker["coreWorkerStats"] = [core_worker_stats] if core_worker_stats else []
  102. worker["language"] = pid_to_language.get(
  103. pid, dashboard_consts.DEFAULT_LANGUAGE
  104. )
  105. worker["jobId"] = pid_to_job_id.get(pid, dashboard_consts.DEFAULT_JOB_ID)
  106. workers.append(worker)
  107. return workers
  108. @classmethod
  109. async def get_node_info(cls, node_id, get_summary=False):
  110. node_physical_stats = dict(DataSource.node_physical_stats.get(node_id, {}))
  111. node_stats = dict(DataSource.node_stats.get(node_id, {}))
  112. node = DataSource.nodes.get(node_id, {})
  113. if get_summary:
  114. node_physical_stats.pop("workers", None)
  115. node_stats.pop("workersStats", None)
  116. else:
  117. node_stats.pop("coreWorkersStats", None)
  118. store_stats = node_stats.get("storeStats", {})
  119. used = int(store_stats.get("objectStoreBytesUsed", 0))
  120. # objectStoreBytesAvail == total in the object_manager.cc definition.
  121. total = int(store_stats.get("objectStoreBytesAvail", 0))
  122. ray_stats = {
  123. "object_store_used_memory": used,
  124. "object_store_available_memory": total - used,
  125. }
  126. node_info = node_physical_stats
  127. # Merge node stats to node physical stats under raylet
  128. node_info["raylet"] = node_stats
  129. node_info["raylet"].update(ray_stats)
  130. # Merge GcsNodeInfo to node physical stats
  131. node_info["raylet"].update(node)
  132. death_info = node.get("deathInfo", {})
  133. node_info["raylet"]["stateMessage"] = compose_state_message(
  134. death_info.get("reason", None), death_info.get("reasonMessage", None)
  135. )
  136. if not get_summary:
  137. actor_table_entries = DataSource.node_actors.get(node_id, {})
  138. # Merge actors to node physical stats
  139. node_info["actors"] = {
  140. actor_id: await DataOrganizer._get_actor_info(actor_table_entry)
  141. for actor_id, actor_table_entry in actor_table_entries.items()
  142. }
  143. # Update workers to node physical stats
  144. node_info["workers"] = DataSource.node_workers.get(node_id, [])
  145. return node_info
  146. @classmethod
  147. async def get_all_node_summary(cls):
  148. return [
  149. # NOTE: We're intentionally awaiting in a loop to avoid excessive
  150. # concurrency spinning up excessive # of tasks for large clusters
  151. await DataOrganizer.get_node_info(node_id, get_summary=True)
  152. for node_id in DataSource.nodes.keys()
  153. ]
  154. @classmethod
  155. async def get_actor_infos(cls, actor_ids: Optional[List[str]] = None):
  156. target_actor_table_entries: dict[str, Optional[dict]]
  157. if actor_ids is not None:
  158. target_actor_table_entries = {
  159. actor_id: DataSource.actors.get(actor_id) for actor_id in actor_ids
  160. }
  161. else:
  162. target_actor_table_entries = DataSource.actors
  163. return {
  164. actor_id: await DataOrganizer._get_actor_info(actor_table_entry)
  165. for actor_id, actor_table_entry in target_actor_table_entries.items()
  166. }
  167. @staticmethod
  168. async def _get_actor_info(actor: Optional[dict]) -> Optional[dict]:
  169. if actor is None:
  170. return None
  171. actor = actor.copy()
  172. worker_id = actor["address"]["workerId"]
  173. core_worker_stats = DataSource.core_worker_stats.get(worker_id, {})
  174. actor.update(core_worker_stats)
  175. # TODO(fyrestone): remove this, give a link from actor
  176. # info to worker info in front-end.
  177. node_id = actor["address"]["nodeId"]
  178. pid = core_worker_stats.get("pid")
  179. node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
  180. actor_process_stats = None
  181. actor_process_gpu_stats = []
  182. if pid:
  183. for process_stats in node_physical_stats.get("workers", []):
  184. if process_stats["pid"] == pid:
  185. actor_process_stats = process_stats
  186. break
  187. for gpu_stats in node_physical_stats.get("gpus", []):
  188. # gpu_stats.get("processesPids") can be None, an empty list or a
  189. # list of dictionaries.
  190. for process in gpu_stats.get("processesPids") or []:
  191. if process["pid"] == pid:
  192. actor_process_gpu_stats.append(gpu_stats)
  193. break
  194. actor["gpus"] = actor_process_gpu_stats
  195. actor["processStats"] = actor_process_stats
  196. actor["mem"] = node_physical_stats.get("mem", [])
  197. required_resources = parse_pg_formatted_resources_to_original(
  198. actor["requiredResources"]
  199. )
  200. actor["requiredResources"] = required_resources
  201. return actor