utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. import asyncio
  2. import dataclasses
  3. import logging
  4. import os
  5. import re
  6. import traceback
  7. from dataclasses import dataclass
  8. from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Union
  9. from ray._private import ray_constants
  10. from ray._raylet import RAY_INTERNAL_NAMESPACE_PREFIX, GcsClient
  11. from ray.dashboard.modules.job.common import (
  12. JOB_ID_METADATA_KEY,
  13. JobInfoStorageClient,
  14. JobStatus,
  15. validate_request_type,
  16. )
  17. from ray.dashboard.modules.job.pydantic_models import DriverInfo, JobDetails, JobType
  18. from ray.runtime_env import RuntimeEnv
  19. try:
  20. # package `aiohttp` is not in ray's minimal dependencies
  21. import aiohttp
  22. from aiohttp.web import Request, Response
  23. except Exception:
  24. aiohttp = None
  25. Request = None
  26. Response = None
  27. logger = logging.getLogger(__name__)
  28. MAX_CHUNK_LINE_LENGTH = 10
  29. MAX_CHUNK_CHAR_LENGTH = 20000
  30. async def get_head_node_id(gcs_client: GcsClient) -> Optional[str]:
  31. """Fetches Head node id persisted in GCS"""
  32. head_node_id_hex_bytes = await gcs_client.async_internal_kv_get(
  33. ray_constants.KV_HEAD_NODE_ID_KEY,
  34. namespace=ray_constants.KV_NAMESPACE_JOB,
  35. timeout=30,
  36. )
  37. if head_node_id_hex_bytes is None:
  38. return None
  39. return head_node_id_hex_bytes.decode()
  40. def strip_keys_with_value_none(d: Dict[str, Any]) -> Dict[str, Any]:
  41. """Strip keys with value None from a dictionary."""
  42. return {k: v for k, v in d.items() if v is not None}
  43. def redact_url_password(url: str) -> str:
  44. """Redact any passwords in a URL."""
  45. secret = re.findall(r"https?:\/\/.*:(.*)@.*", url)
  46. if len(secret) > 0:
  47. url = url.replace(f":{secret[0]}@", ":<redacted>@")
  48. return url
  49. async def file_tail_iterator(path: str) -> AsyncIterator[Optional[List[str]]]:
  50. """Yield lines from a file as it's written.
  51. Returns lines in batches of up to 10 lines or 20000 characters,
  52. whichever comes first. If it's a chunk of 20000 characters, then
  53. the last line that is yielded could be an incomplete line.
  54. New line characters are kept in the line string.
  55. Returns None until the file exists or if no new line has been written.
  56. """
  57. if not isinstance(path, str):
  58. raise TypeError(f"path must be a string, got {type(path)}.")
  59. while not os.path.exists(path):
  60. logger.debug(f"Path {path} doesn't exist yet.")
  61. yield None
  62. EOF = ""
  63. with open(path, "r") as f:
  64. lines = []
  65. chunk_char_count = 0
  66. curr_line = None
  67. while True:
  68. # We want to flush current chunk in following cases:
  69. # - We accumulated 10 lines
  70. # - We accumulated at least MAX_CHUNK_CHAR_LENGTH total chars
  71. # - We reached EOF
  72. if (
  73. len(lines) >= 10
  74. or chunk_char_count > MAX_CHUNK_CHAR_LENGTH
  75. or curr_line == EOF
  76. ):
  77. # Too many lines, return 10 lines in this chunk, and then
  78. # continue reading the file.
  79. yield lines or None
  80. lines = []
  81. chunk_char_count = 0
  82. # Read next line
  83. curr_line = f.readline()
  84. # `readline` will return
  85. # - '' for EOF
  86. # - '\n' for an empty line in the file
  87. if curr_line != EOF:
  88. # Add line to current chunk
  89. lines.append(curr_line)
  90. chunk_char_count += len(curr_line)
  91. else:
  92. # If EOF is reached sleep for 1s before continuing
  93. await asyncio.sleep(1)
  94. async def parse_and_validate_request(
  95. req: Request, request_type: dataclass
  96. ) -> Union[dataclass, Response]:
  97. """Parse request and cast to request type.
  98. Remove keys with value None to allow newer client versions with new optional fields
  99. to work with older servers.
  100. If parsing failed, return a Response object with status 400 and stacktrace instead.
  101. Args:
  102. req: aiohttp request object.
  103. request_type: dataclass type to cast request to.
  104. Returns:
  105. Parsed request object or Response object with status 400 and stacktrace.
  106. """
  107. import aiohttp
  108. json_data = strip_keys_with_value_none(await req.json())
  109. try:
  110. return validate_request_type(json_data, request_type)
  111. except Exception as e:
  112. logger.info(f"Got invalid request type: {e}")
  113. return Response(
  114. text=traceback.format_exc(),
  115. status=aiohttp.web.HTTPBadRequest.status_code,
  116. )
  117. async def get_driver_jobs(
  118. gcs_client: GcsClient,
  119. job_or_submission_id: Optional[str] = None,
  120. timeout: Optional[int] = None,
  121. ) -> Tuple[Dict[str, JobDetails], Dict[str, DriverInfo]]:
  122. """Returns a tuple of dictionaries related to drivers.
  123. The first dictionary contains all driver jobs and is keyed by the job's id.
  124. The second dictionary contains drivers that belong to submission jobs.
  125. It's keyed by the submission job's submission id.
  126. Only the last driver of a submission job is returned.
  127. An optional job_or_submission_id filter can be provided to only return
  128. jobs with the job id or submission id.
  129. """
  130. job_infos = await gcs_client.async_get_all_job_info(
  131. job_or_submission_id=job_or_submission_id,
  132. skip_submission_job_info_field=True,
  133. skip_is_running_tasks_field=True,
  134. timeout=timeout,
  135. )
  136. # Sort jobs from GCS to follow convention of returning only last driver
  137. # of submission job.
  138. sorted_job_infos = sorted(
  139. job_infos.values(), key=lambda job_table_entry: job_table_entry.job_id.hex()
  140. )
  141. jobs = {}
  142. submission_job_drivers = {}
  143. for job_table_entry in sorted_job_infos:
  144. if job_table_entry.config.ray_namespace.startswith(
  145. RAY_INTERNAL_NAMESPACE_PREFIX
  146. ):
  147. # Skip jobs in any _ray_internal_ namespace
  148. continue
  149. job_id = job_table_entry.job_id.hex()
  150. metadata = dict(job_table_entry.config.metadata)
  151. job_submission_id = metadata.get(JOB_ID_METADATA_KEY)
  152. if not job_submission_id:
  153. driver = DriverInfo(
  154. id=job_id,
  155. node_ip_address=job_table_entry.driver_address.ip_address,
  156. pid=str(job_table_entry.driver_pid),
  157. )
  158. job = JobDetails(
  159. job_id=job_id,
  160. type=JobType.DRIVER,
  161. status=JobStatus.SUCCEEDED
  162. if job_table_entry.is_dead
  163. else JobStatus.RUNNING,
  164. entrypoint=job_table_entry.entrypoint,
  165. start_time=job_table_entry.start_time,
  166. end_time=job_table_entry.end_time,
  167. metadata=metadata,
  168. runtime_env=RuntimeEnv.deserialize(
  169. job_table_entry.config.runtime_env_info.serialized_runtime_env
  170. ).to_dict(),
  171. driver_info=driver,
  172. )
  173. jobs[job_id] = job
  174. else:
  175. driver = DriverInfo(
  176. id=job_id,
  177. node_ip_address=job_table_entry.driver_address.ip_address,
  178. pid=str(job_table_entry.driver_pid),
  179. )
  180. submission_job_drivers[job_submission_id] = driver
  181. return jobs, submission_job_drivers
  182. async def find_job_by_ids(
  183. gcs_client: GcsClient,
  184. job_info_client: JobInfoStorageClient,
  185. job_or_submission_id: str,
  186. ) -> Optional[JobDetails]:
  187. """
  188. Attempts to find the job with a given submission_id or job id.
  189. """
  190. # First try to find by job_id
  191. driver_jobs, submission_job_drivers = await get_driver_jobs(
  192. gcs_client, job_or_submission_id=job_or_submission_id
  193. )
  194. job = driver_jobs.get(job_or_submission_id)
  195. if job:
  196. return job
  197. # Try to find a driver with the given id
  198. submission_id = next(
  199. (
  200. id
  201. for id, driver in submission_job_drivers.items()
  202. if driver.id == job_or_submission_id
  203. ),
  204. None,
  205. )
  206. if not submission_id:
  207. # If we didn't find a driver with the given id,
  208. # then lets try to search for a submission with given id
  209. submission_id = job_or_submission_id
  210. job_info = await job_info_client.get_info(submission_id)
  211. if job_info:
  212. driver = submission_job_drivers.get(submission_id)
  213. job = JobDetails(
  214. **dataclasses.asdict(job_info),
  215. submission_id=submission_id,
  216. job_id=driver.id if driver else None,
  217. driver_info=driver,
  218. type=JobType.SUBMISSION,
  219. )
  220. return job
  221. return None
  222. async def find_jobs_by_job_ids(
  223. gcs_client: GcsClient,
  224. job_info_client: JobInfoStorageClient,
  225. job_ids: List[str],
  226. ) -> Dict[str, JobDetails]:
  227. """
  228. Returns a dictionary of submission jobs with the given job ids, keyed by the job id.
  229. This only accepts job ids and not submission ids.
  230. """
  231. driver_jobs, submission_job_drivers = await get_driver_jobs(gcs_client)
  232. # Filter down to the request job_ids
  233. driver_jobs = {key: job for key, job in driver_jobs.items() if key in job_ids}
  234. submission_job_drivers = {
  235. key: job for key, job in submission_job_drivers.items() if job.id in job_ids
  236. }
  237. # Fetch job details for each job
  238. job_submission_ids = submission_job_drivers.keys()
  239. job_infos = await asyncio.gather(
  240. *[
  241. job_info_client.get_info(submission_id)
  242. for submission_id in job_submission_ids
  243. ]
  244. )
  245. return {
  246. **driver_jobs,
  247. **{
  248. submission_job_drivers.get(submission_id).id: JobDetails(
  249. **dataclasses.asdict(job_info),
  250. submission_id=submission_id,
  251. job_id=submission_job_drivers.get(submission_id).id,
  252. driver_info=submission_job_drivers.get(submission_id),
  253. type=JobType.SUBMISSION,
  254. )
  255. for job_info, submission_id in zip(job_infos, job_submission_ids)
  256. },
  257. }
  258. def fast_tail_last_n_lines(
  259. path: str,
  260. num_lines: int,
  261. max_chars: int,
  262. block_size: int = 8192,
  263. ) -> str:
  264. """Return the last ``num_lines`` lines from a large log file efficiently.
  265. This function avoids scanning the entire file. It seeks to the end of
  266. the file and reads backwards in fixed-size blocks until enough lines are
  267. collected. This is much faster for large files compared to using
  268. ``file_tail_iterator()``, which performs a full sequential scan.
  269. Args:
  270. path: The file path to read.
  271. num_lines: Number of lines to return.
  272. max_chars: Maximum number of characters in the returned string.
  273. block_size: Read size for each backward block.
  274. Returns:
  275. A string containing at most ``num_lines`` of the last lines in the file,
  276. truncated to ``max_chars`` characters.
  277. """
  278. if num_lines < 0:
  279. raise ValueError(f"num_lines must be non-negative, got {num_lines}")
  280. if num_lines == 0:
  281. return ""
  282. if max_chars < 0:
  283. raise ValueError(f"max_chars must be non-negative, got {max_chars}")
  284. if max_chars == 0:
  285. return ""
  286. if block_size <= 0:
  287. raise ValueError(f"block_size must be positive, got {block_size}")
  288. logger.debug(
  289. f"Start reading log file {path} with num_lines={num_lines} max_chars={max_chars} block_size={block_size}"
  290. )
  291. with open(path, "rb") as f:
  292. f.seek(0, os.SEEK_END)
  293. file_size = f.tell()
  294. if file_size == 0:
  295. return ""
  296. chunks = []
  297. position = file_size
  298. newlines_found = 0
  299. # We read backwards in chunks until we have enough newlines for num_lines.
  300. # We may need one more newline to capture the content before the first newline.
  301. while position > 0 and newlines_found < num_lines + 1:
  302. read_size = min(block_size, position)
  303. position -= read_size
  304. f.seek(position)
  305. chunk = f.read(read_size)
  306. newlines_found += chunk.count(b"\n")
  307. chunks.insert(0, chunk)
  308. buffer = b"".join(chunks)
  309. lines = buffer.decode("utf-8", errors="replace").splitlines(keepends=True)
  310. if len(lines) <= num_lines:
  311. result = "".join(lines)
  312. else:
  313. result = "".join(lines[-num_lines:])
  314. return result[-max_chars:]