node_head.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. import asyncio
  2. import json
  3. import logging
  4. import time
  5. from collections import defaultdict, deque
  6. from concurrent.futures import ThreadPoolExecutor
  7. from itertools import chain
  8. from typing import Any, AsyncGenerator, Dict, Iterable, List, Optional, Set
  9. import aiohttp.web
  10. import grpc
  11. import ray._private.utils
  12. import ray.dashboard.optional_utils as dashboard_optional_utils
  13. import ray.dashboard.utils as dashboard_utils
  14. from ray._common.utils import get_or_create_event_loop
  15. from ray._private import ray_constants
  16. from ray._private.collections_utils import split
  17. from ray._private.gcs_pubsub import (
  18. GcsAioActorSubscriber,
  19. GcsAioNodeInfoSubscriber,
  20. GcsAioResourceUsageSubscriber,
  21. )
  22. from ray._private.grpc_utils import init_grpc_channel
  23. from ray._private.ray_constants import (
  24. DEBUG_AUTOSCALING_ERROR,
  25. DEBUG_AUTOSCALING_STATUS,
  26. env_integer,
  27. )
  28. from ray.autoscaler._private.util import (
  29. LoadMetricsSummary,
  30. get_per_node_breakdown_as_dict,
  31. parse_usage,
  32. )
  33. from ray.core.generated import gcs_pb2, node_manager_pb2, node_manager_pb2_grpc
  34. from ray.dashboard.consts import (
  35. DASHBOARD_AGENT_ADDR_IP_PREFIX,
  36. DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX,
  37. GCS_RPC_TIMEOUT_SECONDS,
  38. )
  39. from ray.dashboard.modules.node import actor_consts, node_consts
  40. from ray.dashboard.modules.node.datacenter import DataOrganizer, DataSource
  41. from ray.dashboard.modules.reporter.reporter_models import StatsPayload
  42. from ray.dashboard.subprocesses.module import SubprocessModule
  43. from ray.dashboard.subprocesses.routes import SubprocessRouteTable as routes
  44. from ray.dashboard.utils import async_loop_forever
  45. logger = logging.getLogger(__name__)
  46. # NOTE: Executor in this head is intentionally constrained to just 1 thread by
  47. # default to limit its concurrency, therefore reducing potential for
  48. # GIL contention
  49. RAY_DASHBOARD_NODE_HEAD_TPE_MAX_WORKERS = env_integer(
  50. "RAY_DASHBOARD_NODE_HEAD_TPE_MAX_WORKERS", 1
  51. )
  52. MAX_DESTROYED_ACTORS_TO_CACHE = max(
  53. 0, ray._config.maximum_gcs_destroyed_actor_cached_count()
  54. )
  55. ACTOR_CLEANUP_FREQUENCY = 1 # seconds
  56. ACTOR_TABLE_STATE_COLUMNS = (
  57. "state",
  58. "address",
  59. "numRestarts",
  60. "timestamp",
  61. "pid",
  62. "exitDetail",
  63. "startTime",
  64. "endTime",
  65. "reprName",
  66. )
  67. def _gcs_node_info_to_dict(message: gcs_pb2.GcsNodeInfo) -> dict:
  68. return dashboard_utils.message_to_dict(
  69. message, {"nodeId"}, always_print_fields_with_no_presence=True
  70. )
  71. def _actor_table_data_to_dict(message):
  72. orig_message = dashboard_utils.message_to_dict(
  73. message,
  74. {
  75. "actorId",
  76. "parentId",
  77. "jobId",
  78. "workerId",
  79. "nodeId",
  80. "callerId",
  81. "taskId",
  82. "parentTaskId",
  83. "sourceActorId",
  84. "placementGroupId",
  85. },
  86. always_print_fields_with_no_presence=True,
  87. )
  88. # The complete schema for actor table is here:
  89. # src/ray/protobuf/gcs.proto
  90. # It is super big and for dashboard, we don't need that much information.
  91. # Only preserve the necessary ones here for memory usage.
  92. fields = {
  93. "actorId",
  94. "jobId",
  95. "pid",
  96. "address",
  97. "state",
  98. "name",
  99. "numRestarts",
  100. "timestamp",
  101. "className",
  102. "startTime",
  103. "endTime",
  104. "reprName",
  105. "placementGroupId",
  106. "callSite",
  107. "labelSelector",
  108. }
  109. light_message = {k: v for (k, v) in orig_message.items() if k in fields}
  110. light_message["actorClass"] = orig_message["className"]
  111. exit_detail = "-"
  112. if "deathCause" in orig_message:
  113. context = orig_message["deathCause"]
  114. if "actorDiedErrorContext" in context:
  115. exit_detail = context["actorDiedErrorContext"]["errorMessage"] # noqa
  116. elif "runtimeEnvFailedContext" in context:
  117. exit_detail = context["runtimeEnvFailedContext"]["errorMessage"] # noqa
  118. elif "actorUnschedulableContext" in context:
  119. exit_detail = context["actorUnschedulableContext"]["errorMessage"] # noqa
  120. elif "creationTaskFailureContext" in context:
  121. exit_detail = context["creationTaskFailureContext"][
  122. "formattedExceptionString"
  123. ] # noqa
  124. light_message["exitDetail"] = exit_detail
  125. light_message["startTime"] = int(light_message["startTime"])
  126. light_message["endTime"] = int(light_message["endTime"])
  127. light_message["requiredResources"] = dict(message.required_resources)
  128. light_message["labelSelector"] = dict(message.label_selector)
  129. return light_message
  130. class NodeHead(SubprocessModule):
  131. def __init__(self, *args, **kwargs):
  132. super().__init__(*args, **kwargs)
  133. self._stubs = {}
  134. self._collect_memory_info = False
  135. # The time where the module is started.
  136. self._module_start_time = time.time()
  137. # The time it takes until the head node is registered. None means
  138. # head node hasn't been registered.
  139. self._head_node_registration_time_s = None
  140. # The node ID of the current head node
  141. self._registered_head_node_id = None
  142. # Queue of dead nodes to be removed, up to MAX_DEAD_NODES_TO_CACHE
  143. self._dead_node_queue = deque()
  144. self._node_executor = ThreadPoolExecutor(
  145. max_workers=RAY_DASHBOARD_NODE_HEAD_TPE_MAX_WORKERS,
  146. thread_name_prefix="node_head_node_executor",
  147. )
  148. self._gcs_actor_channel_subscriber = None
  149. # A queue of dead actors in order of when they died
  150. self._destroyed_actors_queue = deque()
  151. # -- Internal state --
  152. self._loop = get_or_create_event_loop()
  153. # NOTE: This executor is intentionally constrained to just 1 thread to
  154. # limit its concurrency, therefore reducing potential for GIL contention
  155. self._actor_executor = ThreadPoolExecutor(
  156. max_workers=1, thread_name_prefix="node_head_actor_executor"
  157. )
  158. self._background_tasks: Set[asyncio.Task] = set()
  159. def get_internal_states(self):
  160. return {
  161. "head_node_registration_time_s": self._head_node_registration_time_s,
  162. "registered_nodes": len(DataSource.nodes),
  163. "module_lifetime_s": time.time() - self._module_start_time,
  164. }
  165. async def _subscribe_for_node_updates(self) -> AsyncGenerator[dict, None]:
  166. """
  167. Yields the initial state of all nodes, then yields the updated state of nodes.
  168. It makes GetAllNodeInfo call only once after the subscription is done, to get
  169. the initial state of the nodes.
  170. """
  171. subscriber = GcsAioNodeInfoSubscriber(address=self.gcs_address)
  172. await subscriber.subscribe()
  173. # Get all node info from GCS. To prevent Time-of-check to time-of-use issue [1],
  174. # it happens after the subscription. That is, an update between
  175. # get-all-node-info and the subscription is not missed.
  176. # [1] https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
  177. all_node_info = await self.gcs_client.async_get_all_node_info(timeout=None)
  178. def _convert_to_dict(messages: Iterable[gcs_pb2.GcsNodeInfo]) -> List[dict]:
  179. return [_gcs_node_info_to_dict(m) for m in messages]
  180. all_node_infos = await self._loop.run_in_executor(
  181. self._node_executor,
  182. _convert_to_dict,
  183. all_node_info.values(),
  184. )
  185. for node in all_node_infos:
  186. yield node
  187. while True:
  188. try:
  189. node_id_updated_info_tuples = await subscriber.poll(
  190. batch_size=node_consts.RAY_DASHBOARD_NODE_SUBSCRIBER_POLL_SIZE
  191. )
  192. if node_id_updated_info_tuples:
  193. _, updated_infos_proto = zip(*node_id_updated_info_tuples)
  194. else:
  195. updated_infos_proto = []
  196. updated_infos = await self._loop.run_in_executor(
  197. self._node_executor,
  198. _convert_to_dict,
  199. updated_infos_proto,
  200. )
  201. for node in updated_infos:
  202. yield node
  203. except Exception:
  204. logger.exception("Failed handling updated nodes.")
  205. async def _update_node(self, node: dict):
  206. node_id = node["nodeId"] # hex
  207. if (
  208. node["isHeadNode"]
  209. and node["state"] == "ALIVE"
  210. and self._registered_head_node_id != node_id
  211. ):
  212. if self._registered_head_node_id is not None:
  213. logger.warning(
  214. "A new head node has become ALIVE. New head node ID: %s, old head node ID: %s, internal states: %s",
  215. node_id,
  216. self._registered_head_node_id,
  217. self.get_internal_states(),
  218. )
  219. self._registered_head_node_id = node_id
  220. self._head_node_registration_time_s = time.time() - self._module_start_time
  221. # Put head node ID in the internal KV to be read by JobAgent.
  222. # TODO(architkulkarni): Remove once State API exposes which
  223. # node is the head node.
  224. await self.gcs_client.async_internal_kv_put(
  225. ray_constants.KV_HEAD_NODE_ID_KEY,
  226. node_id.encode(),
  227. overwrite=True,
  228. namespace=ray_constants.KV_NAMESPACE_JOB,
  229. timeout=GCS_RPC_TIMEOUT_SECONDS,
  230. )
  231. assert node["state"] in ["ALIVE", "DEAD"]
  232. is_alive = node["state"] == "ALIVE"
  233. if not is_alive:
  234. # Remove the agent address from the internal KV.
  235. keys = [
  236. f"{DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX}{node_id}",
  237. f"{DASHBOARD_AGENT_ADDR_IP_PREFIX}{node['nodeManagerAddress']}",
  238. ]
  239. tasks = [
  240. self.gcs_client.async_internal_kv_del(
  241. key,
  242. del_by_prefix=False,
  243. namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
  244. timeout=GCS_RPC_TIMEOUT_SECONDS,
  245. )
  246. for key in keys
  247. ]
  248. await asyncio.gather(*tasks)
  249. self._dead_node_queue.append(node_id)
  250. if len(self._dead_node_queue) > node_consts.MAX_DEAD_NODES_TO_CACHE:
  251. node_id = self._dead_node_queue.popleft()
  252. DataSource.nodes.pop(node_id, None)
  253. self._stubs.pop(node_id, None)
  254. DataSource.nodes[node_id] = node
  255. # TODO(fyrestone): Handle exceptions.
  256. address = "{}:{}".format(
  257. node["nodeManagerAddress"], int(node["nodeManagerPort"])
  258. )
  259. options = ray_constants.GLOBAL_GRPC_OPTIONS
  260. channel = init_grpc_channel(address, options, asynchronous=True)
  261. stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
  262. self._stubs[node_id] = stub
  263. async def _update_nodes(self):
  264. """
  265. Subscribe to node updates and update the internal states. If the head node is
  266. not registered after RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT, it logs a
  267. warning only once.
  268. """
  269. warning_shown = False
  270. async for node in self._subscribe_for_node_updates():
  271. await self._update_node(node)
  272. if not self._head_node_registration_time_s:
  273. # head node is not registered yet
  274. if (
  275. not warning_shown
  276. and (time.time() - self._module_start_time)
  277. > node_consts.RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT
  278. ):
  279. logger.warning(
  280. "Head node is not registered even after "
  281. f"{node_consts.RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT} seconds. "
  282. "The API server might not work correctly. Please "
  283. "report a Github issue. Internal states :"
  284. f"{self.get_internal_states()}"
  285. )
  286. warning_shown = True
  287. async def get_nodes_logical_resources(self) -> dict:
  288. from ray.autoscaler.v2.utils import is_autoscaler_v2
  289. if is_autoscaler_v2():
  290. from ray.autoscaler.v2.schema import Stats
  291. from ray.autoscaler.v2.sdk import ClusterStatusParser
  292. try:
  293. # here we have a sync request
  294. req_time = time.time()
  295. cluster_status = await self.gcs_client.async_get_cluster_status()
  296. reply_time = time.time()
  297. cluster_status = ClusterStatusParser.from_get_cluster_status_reply(
  298. cluster_status,
  299. stats=Stats(
  300. gcs_request_time_s=reply_time - req_time, request_ts_s=req_time
  301. ),
  302. )
  303. except Exception:
  304. logger.exception("Error getting cluster status")
  305. return {}
  306. per_node_resources = {}
  307. # TODO(rickyx): we should just return structure data rather than strings.
  308. for node in chain(cluster_status.active_nodes, cluster_status.idle_nodes):
  309. if not node.resource_usage:
  310. continue
  311. usage_dict = {
  312. r.resource_name: (r.used, r.total)
  313. for r in node.resource_usage.usage
  314. }
  315. per_node_resources[node.node_id] = "\n".join(
  316. parse_usage(usage_dict, verbose=True)
  317. )
  318. return per_node_resources
  319. # Legacy autoscaler status code.
  320. (status_string, error) = await asyncio.gather(
  321. *[
  322. self.gcs_client.async_internal_kv_get(
  323. key.encode(), namespace=None, timeout=GCS_RPC_TIMEOUT_SECONDS
  324. )
  325. for key in [
  326. DEBUG_AUTOSCALING_STATUS,
  327. DEBUG_AUTOSCALING_ERROR,
  328. ]
  329. ]
  330. )
  331. if not status_string:
  332. return {}
  333. status_dict = json.loads(status_string)
  334. lm_summary_dict = status_dict.get("load_metrics_report")
  335. if lm_summary_dict:
  336. lm_summary = LoadMetricsSummary(**lm_summary_dict)
  337. node_logical_resources = get_per_node_breakdown_as_dict(lm_summary)
  338. return node_logical_resources if error is None else {}
  339. @routes.get("/nodes")
  340. @dashboard_optional_utils.aiohttp_cache
  341. async def get_all_nodes(self, req) -> aiohttp.web.Response:
  342. view = req.query.get("view")
  343. if view == "summary":
  344. all_node_summary_task = DataOrganizer.get_all_node_summary()
  345. nodes_logical_resource_task = self.get_nodes_logical_resources()
  346. all_node_summary, nodes_logical_resources = await asyncio.gather(
  347. all_node_summary_task, nodes_logical_resource_task
  348. )
  349. return dashboard_optional_utils.rest_response(
  350. status_code=dashboard_utils.HTTPStatusCode.OK,
  351. message="Node summary fetched.",
  352. summary=all_node_summary,
  353. node_logical_resources=nodes_logical_resources,
  354. )
  355. elif view is not None and view.lower() == "hostNameList".lower():
  356. alive_hostnames = set()
  357. for node in DataSource.nodes.values():
  358. if node["state"] == "ALIVE":
  359. alive_hostnames.add(node["nodeManagerHostname"])
  360. return dashboard_optional_utils.rest_response(
  361. status_code=dashboard_utils.HTTPStatusCode.OK,
  362. message="Node hostname list fetched.",
  363. host_name_list=list(alive_hostnames),
  364. )
  365. else:
  366. return dashboard_optional_utils.rest_response(
  367. status_code=dashboard_utils.HTTPStatusCode.INTERNAL_ERROR,
  368. message=f"Unknown view {view}",
  369. )
  370. @routes.get("/nodes/{node_id}")
  371. @dashboard_optional_utils.aiohttp_cache
  372. async def get_node(self, req) -> aiohttp.web.Response:
  373. node_id = req.match_info.get("node_id")
  374. node_info = await DataOrganizer.get_node_info(node_id)
  375. return dashboard_optional_utils.rest_response(
  376. status_code=dashboard_utils.HTTPStatusCode.OK,
  377. message="Node details fetched.",
  378. detail=node_info,
  379. )
  380. @async_loop_forever(node_consts.NODE_STATS_UPDATE_INTERVAL_SECONDS)
  381. async def _update_node_stats(self):
  382. timeout = max(2, node_consts.NODE_STATS_UPDATE_INTERVAL_SECONDS - 1)
  383. # NOTE: We copy stubs to make sure
  384. # it doesn't change during the iteration (since its being updated
  385. # from another async task)
  386. current_stub_node_id_tuples = list(self._stubs.items())
  387. node_ids = []
  388. get_node_stats_tasks = []
  389. for _, (node_id, stub) in enumerate(current_stub_node_id_tuples):
  390. node_info = DataSource.nodes.get(node_id)
  391. if node_info["state"] != "ALIVE":
  392. continue
  393. node_ids.append(node_id)
  394. get_node_stats_tasks.append(
  395. stub.GetNodeStats(
  396. node_manager_pb2.GetNodeStatsRequest(
  397. include_memory_info=self._collect_memory_info
  398. ),
  399. timeout=timeout,
  400. )
  401. )
  402. responses = []
  403. # NOTE: We're chunking up fetching of the stats to run in batches of no more
  404. # than 100 nodes at a time to avoid flooding the event-loop's queue
  405. # with potentially a large, uninterrupted sequence of tasks updating
  406. # the node stats for very large clusters.
  407. for get_node_stats_tasks_chunk in split(get_node_stats_tasks, 100):
  408. current_chunk_responses = await asyncio.gather(
  409. *get_node_stats_tasks_chunk,
  410. return_exceptions=True,
  411. )
  412. responses.extend(current_chunk_responses)
  413. # We're doing short (25ms) yield after every chunk to make sure
  414. # - We're not overloading the event-loop with excessive # of tasks
  415. # - Allowing 10k nodes stats fetches be sent out performed in 2.5s
  416. await asyncio.sleep(0.025)
  417. def postprocess(node_id_response_tuples):
  418. """Pure function reorganizing the data into {node_id: stats}."""
  419. new_node_stats = {}
  420. for node_id, response in node_id_response_tuples:
  421. if isinstance(response, asyncio.CancelledError):
  422. pass
  423. elif isinstance(response, grpc.RpcError):
  424. if response.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
  425. message = (
  426. f"Cannot reach the node, {node_id}, after timeout "
  427. f" {timeout}. This node may have been overloaded, "
  428. "terminated, or the network is slow."
  429. )
  430. elif response.code() == grpc.StatusCode.UNAVAILABLE:
  431. message = (
  432. f"Cannot reach the node, {node_id}. "
  433. "The node may have been terminated."
  434. )
  435. else:
  436. message = f"Error updating node stats of {node_id}."
  437. logger.error(message, exc_info=response)
  438. elif isinstance(response, Exception):
  439. logger.error(
  440. f"Error updating node stats of {node_id}.", exc_info=response
  441. )
  442. else:
  443. new_node_stats[node_id] = dashboard_utils.node_stats_to_dict(
  444. response
  445. )
  446. return new_node_stats
  447. # NOTE: Zip will silently truncate to shorter argument that potentially
  448. # could lead to subtle hard to catch issues, hence the assertion
  449. assert len(node_ids) == len(
  450. responses
  451. ), f"node_ids({len(node_ids)}): {node_ids}, responses({len(responses)}): {responses}"
  452. new_node_stats = await self._loop.run_in_executor(
  453. self._node_executor, postprocess, zip(node_ids, responses)
  454. )
  455. for node_id, new_stat in new_node_stats.items():
  456. DataSource.node_stats[node_id] = new_stat
  457. async def _update_node_physical_stats(self):
  458. """
  459. Update DataSource.node_physical_stats by subscribing to the GCS resource usage.
  460. """
  461. subscriber = GcsAioResourceUsageSubscriber(address=self.gcs_address)
  462. await subscriber.subscribe()
  463. while True:
  464. try:
  465. # The key is b'RAY_REPORTER:{node id hex}',
  466. # e.g. b'RAY_REPORTER:2b4fbd...'
  467. key, data = await subscriber.poll()
  468. if key is None:
  469. continue
  470. # NOTE: Every iteration is executed inside the thread-pool executor
  471. # (TPE) to avoid blocking the Dashboard's event-loop
  472. parsed_data = await self._loop.run_in_executor(
  473. self._node_executor, _parse_node_stats, data
  474. )
  475. node_id = key.split(":")[-1]
  476. DataSource.node_physical_stats[node_id] = parsed_data
  477. except Exception:
  478. logger.exception(
  479. "Error receiving node physical stats from _update_node_physical_stats."
  480. )
  481. async def _update_actors(self):
  482. """
  483. Processes actor info. First gets all actors from GCS, then subscribes to
  484. actor updates. For each actor update, updates DataSource.node_actors and
  485. DataSource.actors.
  486. """
  487. # To prevent Time-of-check to time-of-use issue [1], the get-all-actor-info
  488. # happens after the subscription. That is, an update between get-all-actor-info
  489. # and the subscription is not missed.
  490. #
  491. # [1] https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
  492. gcs_addr = self.gcs_address
  493. actor_channel_subscriber = GcsAioActorSubscriber(address=gcs_addr)
  494. await actor_channel_subscriber.subscribe()
  495. # Get all actor info.
  496. while True:
  497. try:
  498. logger.info("Getting all actor info from GCS.")
  499. actor_dicts = await self._get_all_actors()
  500. # Update actors
  501. DataSource.actors = actor_dicts
  502. # Update node actors and job actors.
  503. node_actors = defaultdict(dict)
  504. for actor_id_bytes, updated_actor_table in actor_dicts.items():
  505. node_id = updated_actor_table["address"]["nodeId"]
  506. # Update only when node_id is not Nil.
  507. if node_id != actor_consts.NIL_NODE_ID:
  508. node_actors[node_id][actor_id_bytes] = updated_actor_table
  509. # Update node's actor info
  510. DataSource.node_actors = node_actors
  511. logger.info("Received %d actor info from GCS.", len(actor_dicts))
  512. # Break, once all initial actors are successfully fetched
  513. break
  514. except Exception as e:
  515. logger.exception("Error Getting all actor info from GCS", exc_info=e)
  516. await asyncio.sleep(
  517. actor_consts.RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS
  518. )
  519. # Pull incremental updates from the GCS channel
  520. while True:
  521. try:
  522. updated_actor_table_entries = await self._poll_updated_actor_table_data(
  523. actor_channel_subscriber
  524. )
  525. for (
  526. actor_id,
  527. updated_actor_table,
  528. ) in updated_actor_table_entries.items():
  529. self._process_updated_actor_table(actor_id, updated_actor_table)
  530. # TODO emit metrics
  531. logger.debug(
  532. f"Total events processed: {len(updated_actor_table_entries)}, "
  533. f"queue size: {actor_channel_subscriber.queue_size}"
  534. )
  535. except Exception as e:
  536. logger.exception("Error processing actor info from GCS.", exc_info=e)
  537. async def _poll_updated_actor_table_data(
  538. self, actor_channel_subscriber: GcsAioActorSubscriber
  539. ) -> Dict[str, Dict[str, Any]]:
  540. # TODO make batch size configurable
  541. batch = await actor_channel_subscriber.poll(batch_size=200)
  542. # NOTE: We're offloading conversion to a TPE to make sure we're not
  543. # blocking the event-loop for prolonged period of time irrespective
  544. # of the batch size
  545. def _convert_to_dict():
  546. return {
  547. actor_id_bytes.hex(): _actor_table_data_to_dict(
  548. actor_table_data_message
  549. )
  550. for actor_id_bytes, actor_table_data_message in batch
  551. if actor_id_bytes is not None
  552. }
  553. return await self._loop.run_in_executor(self._actor_executor, _convert_to_dict)
  554. def _process_updated_actor_table(
  555. self, actor_id: str, actor_table_data: Dict[str, Any]
  556. ):
  557. """NOTE: This method has to be executed on the event-loop, provided that it
  558. accesses DataSource data structures (to follow its thread-safety model)"""
  559. # If actor is not new registered but updated, we only update
  560. # states related fields.
  561. actor = DataSource.actors.get(actor_id)
  562. if actor and actor_table_data["state"] != "DEPENDENCIES_UNREADY":
  563. for k in ACTOR_TABLE_STATE_COLUMNS:
  564. if k in actor_table_data:
  565. actor[k] = actor_table_data[k]
  566. actor_table_data = actor
  567. actor_id = actor_table_data["actorId"]
  568. node_id = actor_table_data["address"]["nodeId"]
  569. if actor_table_data["state"] == "DEAD":
  570. self._destroyed_actors_queue.append(actor_id)
  571. # Update actors.
  572. DataSource.actors[actor_id] = actor_table_data
  573. # Update node actors (only when node_id is not Nil).
  574. if node_id != actor_consts.NIL_NODE_ID:
  575. node_actors = DataSource.node_actors.get(node_id, {})
  576. node_actors[actor_id] = actor_table_data
  577. DataSource.node_actors[node_id] = node_actors
  578. async def _get_all_actors(self) -> Dict[str, dict]:
  579. actors = await self.gcs_client.async_get_all_actor_info(
  580. timeout=GCS_RPC_TIMEOUT_SECONDS
  581. )
  582. # NOTE: We're offloading conversion to a TPE to make sure we're not
  583. # blocking the event-loop for prolonged period of time for large clusters
  584. def _convert_to_dict():
  585. return {
  586. actor_id.hex(): _actor_table_data_to_dict(actor_table_data)
  587. for actor_id, actor_table_data in actors.items()
  588. }
  589. return await self._loop.run_in_executor(self._actor_executor, _convert_to_dict)
  590. async def _cleanup_actors(self):
  591. while True:
  592. try:
  593. while len(self._destroyed_actors_queue) > MAX_DESTROYED_ACTORS_TO_CACHE:
  594. actor_id = self._destroyed_actors_queue.popleft()
  595. if actor_id in DataSource.actors:
  596. actor = DataSource.actors.pop(actor_id)
  597. node_id = actor["address"].get("nodeId")
  598. if node_id and node_id != actor_consts.NIL_NODE_ID:
  599. del DataSource.node_actors[node_id][actor_id]
  600. await asyncio.sleep(ACTOR_CLEANUP_FREQUENCY)
  601. except Exception:
  602. logger.exception("Error cleaning up actor info from GCS.")
  603. @routes.get("/logical/actors")
  604. @dashboard_optional_utils.aiohttp_cache
  605. async def get_all_actors(self, req) -> aiohttp.web.Response:
  606. actor_ids: Optional[List[str]] = None
  607. if "ids" in req.query:
  608. actor_ids = req.query["ids"].split(",")
  609. actors = await DataOrganizer.get_actor_infos(actor_ids=actor_ids)
  610. return dashboard_optional_utils.rest_response(
  611. status_code=dashboard_utils.HTTPStatusCode.OK,
  612. message="All actors fetched.",
  613. actors=actors,
  614. # False to avoid converting Ray resource name to google style.
  615. # It's not necessary here because the fields are already
  616. # google formatted when protobuf was converted into dict.
  617. convert_google_style=False,
  618. )
  619. @routes.get("/logical/actors/{actor_id}")
  620. @dashboard_optional_utils.aiohttp_cache
  621. async def get_actor(self, req) -> aiohttp.web.Response:
  622. actor_id = req.match_info.get("actor_id")
  623. actors = await DataOrganizer.get_actor_infos(actor_ids=[actor_id])
  624. return dashboard_optional_utils.rest_response(
  625. status_code=dashboard_utils.HTTPStatusCode.OK,
  626. message="Actor details fetched.",
  627. detail=actors[actor_id],
  628. )
  629. @routes.get("/test/dump")
  630. async def dump(self, req) -> aiohttp.web.Response:
  631. """
  632. Dump all data from datacenter. This is used for testing purpose only.
  633. """
  634. key = req.query.get("key")
  635. if key is None:
  636. all_data = {
  637. k: dict(v)
  638. for k, v in DataSource.__dict__.items()
  639. if not k.startswith("_")
  640. }
  641. return dashboard_optional_utils.rest_response(
  642. status_code=dashboard_utils.HTTPStatusCode.OK,
  643. message="Fetch all data from datacenter success.",
  644. **all_data,
  645. )
  646. else:
  647. data = dict(DataSource.__dict__.get(key))
  648. return dashboard_optional_utils.rest_response(
  649. status_code=dashboard_utils.HTTPStatusCode.OK,
  650. message=f"Fetch {key} from datacenter success.",
  651. **{key: data},
  652. )
  653. async def run(self):
  654. await super().run()
  655. coros = [
  656. self._update_nodes(),
  657. self._update_node_stats(),
  658. self._update_node_physical_stats(),
  659. self._update_actors(),
  660. self._cleanup_actors(),
  661. DataOrganizer.purge(),
  662. DataOrganizer.organize(self._node_executor),
  663. ]
  664. for coro in coros:
  665. task = self._loop.create_task(coro)
  666. self._background_tasks.add(task)
  667. task.add_done_callback(self._background_tasks.discard)
  668. def _parse_node_stats(node_stats_str: str) -> dict:
  669. stats_dict = json.loads(node_stats_str)
  670. if StatsPayload is not None:
  671. # Validate the response by parsing the stats_dict.
  672. StatsPayload.parse_obj(stats_dict)
  673. return stats_dict
  674. else:
  675. return stats_dict