profiling.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import json
  2. import os
  3. from collections import defaultdict
  4. from dataclasses import asdict, dataclass
  5. from typing import Dict, List, Union
  6. import ray
  7. class _NullLogSpan:
  8. """A log span context manager that does nothing"""
  9. def __enter__(self):
  10. pass
  11. def __exit__(self, type, value, tb):
  12. pass
  13. PROFILING_ENABLED = "RAY_PROFILING" in os.environ
  14. NULL_LOG_SPAN = _NullLogSpan()
  15. # Colors are specified at
  16. # https://github.com/catapult-project/catapult/blob/master/tracing/tracing/base/color_scheme.html. # noqa: E501
  17. _default_color_mapping = defaultdict(
  18. lambda: "generic_work",
  19. {
  20. "worker_idle": "cq_build_abandoned",
  21. "task": "rail_response",
  22. "task:deserialize_arguments": "rail_load",
  23. "task:execute": "rail_animation",
  24. "task:store_outputs": "rail_idle",
  25. "wait_for_function": "detailed_memory_dump",
  26. "ray.get": "good",
  27. "ray.put": "terrible",
  28. "ray.wait": "vsync_highlight_color",
  29. "submit_task": "background_memory_dump",
  30. "fetch_and_run_function": "detailed_memory_dump",
  31. "register_remote_function": "detailed_memory_dump",
  32. },
  33. )
  34. @dataclass(init=True)
  35. class ChromeTracingCompleteEvent:
  36. # https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview#heading=h.lpfof2aylapb # noqa
  37. # The event categories. This is a comma separated list of categories
  38. # for the event. The categories can be used to hide events in
  39. # the Trace Viewer UI.
  40. cat: str
  41. # The string displayed on the event.
  42. name: str
  43. # The identifier for the group of rows that the event
  44. # appears in.
  45. pid: int
  46. # The identifier for the row that the event appears in.
  47. tid: int
  48. # The start time in microseconds.
  49. ts: int
  50. # The duration in microseconds.
  51. dur: int
  52. # This is the name of the color to display the box in.
  53. cname: str
  54. # The extra user-defined data.
  55. args: Dict[str, Union[str, int]]
  56. # The event type (X means the complete event).
  57. ph: str = "X"
  58. @dataclass(init=True)
  59. class ChromeTracingMetadataEvent:
  60. # https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview#bookmark=id.iycbnb4z7i9g # noqa
  61. name: str
  62. # Metadata arguments. E.g., name: <metadata_name>
  63. args: Dict[str, str]
  64. # The process id of this event. In Ray, pid indicates the node.
  65. pid: int
  66. # The thread id of this event. In Ray, tid indicates each worker.
  67. tid: int = None
  68. # M means the metadata event.
  69. ph: str = "M"
  70. def profile(event_type, extra_data=None):
  71. """Profile a span of time so that it appears in the timeline visualization.
  72. Note that this only works in the raylet code path.
  73. This function can be used as follows (both on the driver or within a task).
  74. .. testcode::
  75. import ray._private.profiling as profiling
  76. with profiling.profile("custom event", extra_data={'key': 'val'}):
  77. # Do some computation here.
  78. x = 1 * 2
  79. Optionally, a dictionary can be passed as the "extra_data" argument, and
  80. it can have keys "name" and "cname" if you want to override the default
  81. timeline display text and box color. Other values will appear at the bottom
  82. of the chrome tracing GUI when you click on the box corresponding to this
  83. profile span.
  84. Args:
  85. event_type: A string describing the type of the event.
  86. extra_data: This must be a dictionary mapping strings to strings. This
  87. data will be added to the json objects that are used to populate
  88. the timeline, so if you want to set a particular color, you can
  89. simply set the "cname" attribute to an appropriate color.
  90. Similarly, if you set the "name" attribute, then that will set the
  91. text displayed on the box in the timeline.
  92. Returns:
  93. An object that can profile a span of time via a "with" statement.
  94. """
  95. if not PROFILING_ENABLED:
  96. return NULL_LOG_SPAN
  97. worker = ray._private.worker.global_worker
  98. if worker.mode == ray._private.worker.LOCAL_MODE:
  99. return NULL_LOG_SPAN
  100. return worker.core_worker.profile_event(event_type.encode("ascii"), extra_data)
  101. def chrome_tracing_dump(
  102. tasks: List[dict],
  103. ) -> str:
  104. """Generate a chrome/perfetto tracing dump using task events.
  105. Args:
  106. tasks: List of tasks generated by a state API list_tasks(detail=True).
  107. Returns:
  108. Json serialized dump to create a chrome/perfetto tracing.
  109. """
  110. # All events from given tasks.
  111. all_events = []
  112. # Chrome tracing doesn't have a concept of "node". Instead, we use
  113. # chrome tracing's pid == ray's node.
  114. # chrome tracing's tid == ray's process.
  115. # Note that pid or tid is usually integer, but ray's node/process has
  116. # ids in string.
  117. # Unfortunately, perfetto doesn't allow to have string as a value of pid/tid.
  118. # To workaround it, we use Metadata event from chrome tracing schema
  119. # (https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview#heading=h.xqopa5m0e28f) # noqa
  120. # which allows pid/tid -> name mapping. In order to use this schema
  121. # we build node_ip/(node_ip, worker_id) -> arbitrary index mapping.
  122. # node ip address -> node idx.
  123. node_to_index = {}
  124. # Arbitrary index mapped to the ip address.
  125. node_idx = 0
  126. # (node index, worker id) -> worker idx
  127. worker_to_index = {}
  128. # Arbitrary index mapped to the (node index, worker id).
  129. worker_idx = 0
  130. for task in tasks:
  131. profiling_data = task.get("profiling_data", [])
  132. if profiling_data:
  133. node_ip_address = profiling_data["node_ip_address"]
  134. component_events = profiling_data["events"]
  135. component_type = profiling_data["component_type"]
  136. component_id = component_type + ":" + profiling_data["component_id"]
  137. if component_type not in ["worker", "driver"]:
  138. continue
  139. for event in component_events:
  140. extra_data = event["extra_data"]
  141. # Propagate extra data.
  142. extra_data["task_id"] = task["task_id"]
  143. extra_data["job_id"] = task["job_id"]
  144. extra_data["attempt_number"] = task["attempt_number"]
  145. extra_data["func_or_class_name"] = task["func_or_class_name"]
  146. extra_data["actor_id"] = task["actor_id"]
  147. event_name = event["event_name"]
  148. # build a id -> arbitrary index mapping
  149. if node_ip_address not in node_to_index:
  150. node_to_index[node_ip_address] = node_idx
  151. # Whenever new node ip is introduced, we increment the index.
  152. node_idx += 1
  153. if (
  154. node_to_index[node_ip_address],
  155. component_id,
  156. ) not in worker_to_index: # noqa
  157. worker_to_index[
  158. (node_to_index[node_ip_address], component_id)
  159. ] = worker_idx # noqa
  160. worker_idx += 1
  161. # Modify the name with the additional user-defined extra data.
  162. cname = _default_color_mapping[event["event_name"]]
  163. name = event_name
  164. if "cname" in extra_data:
  165. cname = _default_color_mapping[event["extra_data"]["cname"]]
  166. if "name" in extra_data:
  167. name = extra_data["name"]
  168. new_event = ChromeTracingCompleteEvent(
  169. cat=event_name,
  170. name=name,
  171. pid=node_to_index[node_ip_address],
  172. tid=worker_to_index[(node_to_index[node_ip_address], component_id)],
  173. ts=event["start_time"] * 1e3,
  174. dur=(event["end_time"] * 1e3) - (event["start_time"] * 1e3),
  175. cname=cname,
  176. args=extra_data,
  177. )
  178. all_events.append(asdict(new_event))
  179. for node, i in node_to_index.items():
  180. all_events.append(
  181. asdict(
  182. ChromeTracingMetadataEvent(
  183. name="process_name",
  184. pid=i,
  185. args={"name": f"Node {node}"},
  186. )
  187. )
  188. )
  189. for worker, i in worker_to_index.items():
  190. all_events.append(
  191. asdict(
  192. ChromeTracingMetadataEvent(
  193. name="thread_name",
  194. ph="M",
  195. tid=i,
  196. pid=worker[0],
  197. args={"name": worker[1]},
  198. )
  199. )
  200. )
  201. # Handle task event disabled.
  202. return json.dumps(all_events)