process_watcher.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import asyncio
  2. import io
  3. import logging
  4. import os
  5. import sys
  6. from concurrent.futures import ThreadPoolExecutor
  7. import ray
  8. import ray._private.ray_constants as ray_constants
  9. import ray.dashboard.consts as dashboard_consts
  10. from ray._common.utils import run_background_task
  11. from ray._raylet import GcsClient
  12. from ray.dashboard.consts import _PARENT_DEATH_THREASHOLD
  13. # Import psutil after ray so the packaged version is used.
  14. import psutil
  15. logger = logging.getLogger(__name__)
  16. # TODO: move all consts from dashboard_consts to ray_constants and rename to remove
  17. # DASHBOARD_ prefixes.
  18. # Publishes at most this number of lines of Raylet logs, when the Raylet dies
  19. # unexpectedly.
  20. _RAYLET_LOG_MAX_PUBLISH_LINES = 20
  21. # Reads at most this amount of Raylet logs from the tail, for publishing and
  22. # checking if the Raylet was terminated gracefully.
  23. _RAYLET_LOG_MAX_TAIL_SIZE = 1 * 1024**2
  24. try:
  25. create_task = asyncio.create_task
  26. except AttributeError:
  27. create_task = asyncio.ensure_future
  28. def get_raylet_pid():
  29. # TODO(edoakes): RAY_RAYLET_PID isn't properly set on Windows. This is
  30. # only used for fate-sharing with the raylet and we need a different
  31. # fate-sharing mechanism for Windows anyways.
  32. if sys.platform in ["win32", "cygwin"]:
  33. return None
  34. raylet_pid = int(os.environ["RAY_RAYLET_PID"])
  35. assert raylet_pid > 0
  36. logger.info("raylet pid is %s", raylet_pid)
  37. return raylet_pid
  38. def create_check_raylet_task(log_dir, gcs_client, parent_dead_callback, loop):
  39. """
  40. Creates an asyncio task to periodically check if the raylet process is still
  41. running. If raylet is dead for _PARENT_DEATH_THREASHOLD (5) times, prepare to exit
  42. as follows:
  43. - Write logs about whether the raylet exit is graceful, by looking into the raylet
  44. log and search for term "SIGTERM",
  45. - Flush the logs via GcsClient,
  46. - Exit.
  47. """
  48. if sys.platform in ["win32", "cygwin"]:
  49. raise RuntimeError("can't check raylet process in Windows.")
  50. raylet_pid = get_raylet_pid()
  51. if dashboard_consts.PARENT_HEALTH_CHECK_BY_PIPE:
  52. logger.info("check_parent_via_pipe")
  53. check_parent_task = _check_parent_via_pipe(
  54. log_dir, gcs_client, loop, parent_dead_callback
  55. )
  56. else:
  57. logger.info("_check_parent")
  58. check_parent_task = _check_parent(
  59. raylet_pid, log_dir, gcs_client, parent_dead_callback
  60. )
  61. return run_background_task(check_parent_task)
  62. def report_raylet_error_logs(log_dir: str, gcs_client: GcsClient):
  63. log_path = os.path.join(log_dir, "raylet.out")
  64. error = False
  65. msg = "Raylet is terminated. "
  66. try:
  67. with open(log_path, "r", encoding="utf-8") as f:
  68. # Seek to _RAYLET_LOG_MAX_TAIL_SIZE from the end if the
  69. # file is larger than that.
  70. f.seek(0, io.SEEK_END)
  71. pos = max(0, f.tell() - _RAYLET_LOG_MAX_TAIL_SIZE)
  72. f.seek(pos, io.SEEK_SET)
  73. # Read remaining logs by lines.
  74. raylet_logs = f.readlines()
  75. # Assume the SIGTERM message must exist within the last
  76. # _RAYLET_LOG_MAX_TAIL_SIZE of the log file.
  77. if any("Raylet received SIGTERM" in line for line in raylet_logs):
  78. msg += "Termination is graceful."
  79. logger.info(msg)
  80. else:
  81. msg += (
  82. "Termination is unexpected. Possible reasons "
  83. "include: (1) SIGKILL by the user or system "
  84. "OOM killer, (2) Invalid memory access from "
  85. "Raylet causing SIGSEGV or SIGBUS, "
  86. "(3) Other termination signals. "
  87. f"Last {_RAYLET_LOG_MAX_PUBLISH_LINES} lines "
  88. "of the Raylet logs:\n"
  89. )
  90. msg += " " + " ".join(
  91. raylet_logs[-_RAYLET_LOG_MAX_PUBLISH_LINES:]
  92. )
  93. error = True
  94. except Exception as e:
  95. msg += f"Failed to read Raylet logs at {log_path}: {e}!"
  96. logger.exception(msg)
  97. error = True
  98. if error:
  99. logger.error(msg)
  100. # TODO: switch to async if necessary.
  101. ray._private.utils.publish_error_to_driver(
  102. ray_constants.RAYLET_DIED_ERROR,
  103. msg,
  104. gcs_client=gcs_client,
  105. )
  106. else:
  107. logger.info(msg)
  108. async def _check_parent_via_pipe(
  109. log_dir: str, gcs_client: GcsClient, loop, parent_dead_callback
  110. ):
  111. while True:
  112. try:
  113. # Read input asynchronously.
  114. # The parent (raylet) should have redirected its pipe
  115. # to stdin. If we read 0 bytes from stdin, it means
  116. # the process is dead.
  117. with ThreadPoolExecutor(max_workers=1) as executor:
  118. input_data = await loop.run_in_executor(
  119. executor, lambda: sys.stdin.readline()
  120. )
  121. if len(input_data) == 0:
  122. # cannot read bytes from parent == parent is dead.
  123. parent_dead_callback("_check_parent_via_pipe: The parent is dead.")
  124. report_raylet_error_logs(log_dir, gcs_client)
  125. sys.exit(0)
  126. except Exception as e:
  127. logger.exception(
  128. "raylet health checking is failed. "
  129. f"The agent process may leak. Exception: {e}"
  130. )
  131. async def _check_parent(raylet_pid, log_dir, gcs_client, parent_dead_callback):
  132. """Check if raylet is dead and fate-share if it is."""
  133. try:
  134. curr_proc = psutil.Process()
  135. parent_death_cnt = 0
  136. while True:
  137. parent = curr_proc.parent()
  138. # If the parent is dead, it is None.
  139. parent_gone = parent is None
  140. init_assigned_for_parent = False
  141. parent_changed = False
  142. if parent:
  143. # Sometimes, the parent is changed to the `init` process.
  144. # In this case, the parent.pid is 1.
  145. init_assigned_for_parent = parent.pid == 1
  146. # Sometimes, the parent is dead, and the pid is reused
  147. # by other processes. In this case, this condition is triggered.
  148. parent_changed = raylet_pid != parent.pid
  149. if parent_gone or init_assigned_for_parent or parent_changed:
  150. parent_death_cnt += 1
  151. logger.warning(
  152. f"Raylet is considered dead {parent_death_cnt} X. "
  153. f"If it reaches to {_PARENT_DEATH_THREASHOLD}, the agent "
  154. f"will kill itself. Parent: {parent}, "
  155. f"parent_gone: {parent_gone}, "
  156. f"init_assigned_for_parent: {init_assigned_for_parent}, "
  157. f"parent_changed: {parent_changed}."
  158. )
  159. if parent_death_cnt < _PARENT_DEATH_THREASHOLD:
  160. await asyncio.sleep(
  161. dashboard_consts.DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_S
  162. )
  163. continue
  164. parent_dead_callback("_check_parent: The parent is dead.")
  165. report_raylet_error_logs(log_dir, gcs_client)
  166. sys.exit(0)
  167. else:
  168. parent_death_cnt = 0
  169. await asyncio.sleep(
  170. dashboard_consts.DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_S
  171. )
  172. except Exception:
  173. logger.exception("Failed to check parent PID, exiting.")
  174. sys.exit(1)