dag_node_operation.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853
  1. import copy
  2. import heapq
  3. import logging
  4. from collections import defaultdict
  5. from enum import Enum
  6. from functools import total_ordering
  7. from typing import Dict, List, Optional, Set, Tuple
  8. import ray
  9. logger = logging.getLogger(__name__)
  10. class _DAGNodeOperationType(Enum):
  11. """
  12. There are three types of operations that a DAG node can perform:
  13. 1. READ: Read from an input channel.
  14. 2. COMPUTE: Execute the method corresponding to the node.
  15. 3. WRITE: Write to an output channel.
  16. """
  17. READ = "READ"
  18. COMPUTE = "COMPUTE"
  19. WRITE = "WRITE"
  20. def viz_str(self):
  21. """
  22. A string representation of the operation type to be used in visualization.
  23. The result string is a single character because conciseness is preferred.
  24. """
  25. if self == _DAGNodeOperationType.READ:
  26. return "R"
  27. elif self == _DAGNodeOperationType.COMPUTE:
  28. return "C"
  29. elif self == _DAGNodeOperationType.WRITE:
  30. return "W"
  31. assert False, f"Unknown operation type: {self}"
  32. class _DAGNodeOperation:
  33. def __init__(
  34. self,
  35. exec_task_idx: int,
  36. operation_type: _DAGNodeOperationType,
  37. method_name: Optional[str] = None,
  38. ):
  39. """
  40. Args:
  41. exec_task_idx: The index of the task that this operation belongs to
  42. in the actor's ExecutableTask list. The index is not the same
  43. as bind_index because there may be more tasks bound to an actor
  44. than tasks that appear in the current compiled DAG.
  45. operation_type: The type of operation to perform.
  46. method_name: The name of the method that this operation originates
  47. from. This is only for visualization and debugging purposes.
  48. """
  49. self.exec_task_idx = exec_task_idx
  50. self.type = operation_type
  51. self.method_name = method_name
  52. def __repr__(self):
  53. return (
  54. f"_DAGNodeOperation("
  55. f"exec_task_idx: {self.exec_task_idx}, "
  56. f"type: {self.type}, "
  57. f"method_name: {self.method_name})"
  58. )
  59. def viz_str(self):
  60. """
  61. A string representation of the node to be used in visualization.
  62. """
  63. return f"[{self.exec_task_idx}] {self.method_name} {self.type.viz_str()}"
  64. def __hash__(self):
  65. return hash((self.exec_task_idx, self.type))
  66. def __eq__(self, other):
  67. # An operation is uniquely identified by its `exec_task_idx` and type.
  68. # `method_name` is only for debugging purposes.
  69. return self.exec_task_idx == other.exec_task_idx and self.type == other.type
  70. @total_ordering
  71. class _DAGOperationGraphNode:
  72. def __init__(
  73. self,
  74. operation: _DAGNodeOperation,
  75. task_idx: int,
  76. actor_handle: "ray.actor.ActorHandle",
  77. requires_accelerator: bool,
  78. ):
  79. """
  80. _DAGOperationGraphNode represents a node in the DAG operation graph.
  81. It contains information about the node's in-degree, out-degree, edges,
  82. and the operation it performs.
  83. Args:
  84. operation: The operation that this node performs. The operation
  85. can be a READ, COMPUTE, or WRITE operation.
  86. task_idx: A unique index which can be used to index into
  87. `CompiledDAG.idx_to_task` to get the corresponding task.
  88. actor_handle: The actor handle to which this operation belongs.
  89. requires_accelerator: Whether this operation requires accelerator.
  90. """
  91. self.operation = operation
  92. self.task_idx = task_idx
  93. self.actor_handle = actor_handle
  94. self.requires_accelerator = requires_accelerator
  95. # The in_edges and out_edges are dicts of tuples to strings.
  96. # Each tuple (the key) contains an integer `task_idx`, which can be
  97. # used to index into `idx_to_task` to get the corresponding task,
  98. # and a `_DAGNodeOperationType`, which can be READ, COMPUTE, or WRITE.
  99. # The string (the value) is the visualization information of the edge,
  100. # it is a tuple of a label of the edge and a boolean indicating whether
  101. # the edge is a control dependency.
  102. self.in_edges: Dict[Tuple[int, _DAGNodeOperationType], Tuple[str, bool]] = {}
  103. self.out_edges: Dict[Tuple[int, _DAGNodeOperationType], Tuple[str, bool]] = {}
  104. # The synchronous nodes are all the nodes that belong to the same accelerator
  105. # operation. Each node is represented by a tuple of its task idx and type.
  106. self.sync_idxs: Set[Tuple[int, _DAGNodeOperationType]] = set()
  107. # The pending synchronous nodes are the nodes that are pending to be executed,
  108. # i.e., their in-degrees are zero. When a synchronous node is pending, it
  109. # will be added to the pending synchronous nodes of all the nodes in the
  110. # accelerator operation.
  111. self.pending_sync_idxs: Set[Tuple[int, _DAGNodeOperationType]] = set()
  112. def __repr__(self):
  113. return (
  114. f"_DAGOperationGraphNode("
  115. f"operation: {self.operation}, "
  116. f"task_idx: {self.task_idx}, "
  117. f"actor_id: {self.actor_handle._ray_actor_id}, "
  118. f"requires_accelerator: {self.requires_accelerator})"
  119. )
  120. def __lt__(self, other: "_DAGOperationGraphNode"):
  121. """
  122. This function defines the order of the nodes in the priority queue used in
  123. `_select_next_nodes`. The priority queue is a min-heap, so the node with
  124. higher priority is considered "less than" the other node.
  125. """
  126. if self.is_accelerator_op != other.is_accelerator_op:
  127. # When one node is an accelerator operation and the other is not,
  128. # prioritize the accelerator operation.
  129. return self.is_accelerator_op
  130. else:
  131. # When either both nodes are accelerator operations or both nodes
  132. # are not accelerator operations, prioritize the earlier task within
  133. # the same actor and load balance tasks across actors. The tie is
  134. # broken by the `task_idx`.
  135. return (self.operation.exec_task_idx, self.task_idx) < (
  136. other.operation.exec_task_idx,
  137. other.task_idx,
  138. )
  139. def __eq__(self, other: "_DAGOperationGraphNode"):
  140. """
  141. Two operations are equal only when they have the same `exec_task_idx` and `type`
  142. and belong to the same actor.
  143. """
  144. return (
  145. self.actor_handle == other.actor_handle
  146. and self.operation.exec_task_idx == other.operation.exec_task_idx
  147. and self.operation.type == other.operation.type
  148. )
  149. def __hash__(self):
  150. """
  151. An operation is uniquely identified by its `task_idx` and type.
  152. """
  153. return hash((self.operation, self.task_idx))
  154. @property
  155. def in_degree(self) -> int:
  156. return len(self.in_edges)
  157. @property
  158. def is_ready(self) -> bool:
  159. """
  160. If a node is not an accelerator operation, it is ready when it has a zero
  161. in-degree.
  162. If it is an accelerator operation, it is ready when all the nodes in the
  163. operation have zero in-degrees.
  164. """
  165. return self.in_degree == 0 and (
  166. len(self.pending_sync_idxs) == len(self.sync_idxs)
  167. )
  168. @property
  169. def is_read(self) -> bool:
  170. return self.operation.type == _DAGNodeOperationType.READ
  171. @property
  172. def is_accelerator_read(self) -> bool:
  173. """
  174. A node is an accelerator read if it is a read node and requires accelerator.
  175. """
  176. return (
  177. self.operation.type == _DAGNodeOperationType.READ
  178. and self.requires_accelerator
  179. )
  180. @property
  181. def is_accelerator_compute(self) -> bool:
  182. """
  183. A node is an accelerator compute if it is a compute node and requires accelerator.
  184. """
  185. return (
  186. self.operation.type == _DAGNodeOperationType.COMPUTE
  187. and self.requires_accelerator
  188. )
  189. @property
  190. def is_accelerator_write(self) -> bool:
  191. """
  192. A node is an accelerator write if it is a write node and requires accelerator.
  193. """
  194. return (
  195. self.operation.type == _DAGNodeOperationType.WRITE
  196. and self.requires_accelerator
  197. )
  198. @property
  199. def is_accelerator_op(self) -> bool:
  200. return (
  201. self.is_accelerator_read
  202. or self.is_accelerator_compute
  203. or self.is_accelerator_write
  204. )
  205. def viz_str(self):
  206. """
  207. A string representation of the node to be used in visualization.
  208. """
  209. return self.operation.viz_str()
  210. @property
  211. def _actor_id(self):
  212. return self.actor_handle._ray_actor_id.hex()
  213. def _add_edge(
  214. from_node: _DAGOperationGraphNode,
  215. to_node: _DAGOperationGraphNode,
  216. label: str = "",
  217. control_dependency: bool = False,
  218. ):
  219. """
  220. Add an edge from `from_node` to `to_node`.
  221. Args:
  222. from_node: The node from which the edge originates.
  223. to_node: The node to which the edge points.
  224. label: The label of the edge. This will be used to annotate the edge
  225. in the visualization of the execution schedule.
  226. """
  227. from_node.out_edges[(to_node.task_idx, to_node.operation.type)] = (
  228. label,
  229. control_dependency,
  230. )
  231. to_node.in_edges[(from_node.task_idx, from_node.operation.type)] = (
  232. label,
  233. control_dependency,
  234. )
  235. def _update_pending_sync_idxs(
  236. graph: Dict[int, Dict[_DAGNodeOperationType, _DAGOperationGraphNode]],
  237. node: _DAGOperationGraphNode,
  238. ) -> None:
  239. """
  240. Update the node as pending for its synchronous nodes.
  241. """
  242. idx = (node.task_idx, node.operation.type)
  243. for task_idx, op_type in node.sync_idxs:
  244. sync_node = graph[task_idx][op_type]
  245. sync_node.pending_sync_idxs.add(idx)
  246. def _push_candidate_node_if_ready(
  247. actor_to_candidates: Dict["ray._raylet.ActorID", List[_DAGOperationGraphNode]],
  248. graph: Dict[int, Dict[_DAGNodeOperationType, _DAGOperationGraphNode]],
  249. node: _DAGOperationGraphNode,
  250. ) -> None:
  251. """
  252. Push the node with a zero in-degree to the candidates if its operation is ready.
  253. If it has synchronous nodes, its accelerator operation is not ready until all
  254. the nodes are pending, then all the nodes will be pushed to the candidates.
  255. """
  256. assert node.in_degree == 0, "Expected to have a zero in-degree"
  257. # For the accelerator write node, update the in-degrees of the downstream
  258. # accelerator read nodes and update them as pending. This is necessary because
  259. # the data dependency edges between accelerator write and read nodes are only
  260. # updated here. The accelerator P2P operation becomes ready after both the write
  261. # and read nodes are marked as pending.
  262. if node.is_accelerator_write:
  263. for task_idx, op_type in node.out_edges:
  264. read_node = graph[task_idx][op_type]
  265. read_node.in_edges.pop((node.task_idx, node.operation.type))
  266. assert read_node.is_accelerator_read and len(read_node.in_edges) == 0
  267. _update_pending_sync_idxs(graph, read_node)
  268. # For the accelerator operation node, update it as pending.
  269. if len(node.sync_idxs) != 0:
  270. _update_pending_sync_idxs(graph, node)
  271. # The accelerator operation is ready when all the nodes have zero in-degrees.
  272. # When the last node in the operation is updated as pending, push all the nodes
  273. # to the candidates.
  274. if node.is_ready:
  275. if len(node.sync_idxs) == 0:
  276. heapq.heappush(
  277. actor_to_candidates[node.actor_handle._actor_id],
  278. node,
  279. )
  280. else:
  281. for task_idx, op_type in node.sync_idxs:
  282. sync_node = graph[task_idx][op_type]
  283. heapq.heappush(
  284. actor_to_candidates[sync_node.actor_handle._actor_id],
  285. sync_node,
  286. )
  287. def _select_next_nodes(
  288. actor_to_candidates: Dict["ray._raylet.ActorID", List[_DAGOperationGraphNode]],
  289. graph: Dict[int, Dict[_DAGNodeOperationType, _DAGOperationGraphNode]],
  290. ) -> Optional[List[_DAGOperationGraphNode]]:
  291. """
  292. This function selects the next nodes for the topological sort to generate
  293. execution schedule. If there are multiple candidate _DAGOperationGraphNodes,
  294. select the node with the top priority. The priority is defined in
  295. `_DAGOperationGraphNode.__lt__`.
  296. For the implementation details, we maintain a priority queue for each actor,
  297. where the head of the priority queue is the node with the smallest `exec_task_idx`.
  298. When a node has a zero in-degree, it is added to the corresponding actor's
  299. priority queue. For a node other than an accelerator collective node, it is ready to be
  300. executed if it has a zero in-degree. For an accelerator collective node, it is ready to
  301. be executed when all the nodes in its collective operation have zero in-degrees.
  302. If a node is an accelerator collective node, it updates the `ready_collective_nodes` of
  303. all the nodes in its collective operation. Unless all the nodes in its collective
  304. group have zero in-degrees, this node is removed from the candidate list.
  305. Eventually, exactly one accelerator collective node from its collective operation is
  306. selected from the candidate list.
  307. If the selected node is an accelerator write node, select all the downstream accelerator
  308. read nodes. If the selected node is an accelerator collective node, select all the accelerator
  309. compute nodes in its collective operation.
  310. Args:
  311. actor_to_candidates: A dictionary mapping an actor id to a list of
  312. candidate nodes. The list is maintained as a priority queue, so
  313. the head of the queue, i.e., `candidates[0]`, is the node with
  314. the smallest `bind_index`.
  315. graph: A dictionary mapping the index of a task to a dictionary of its
  316. _DAGOperationGraphNodes for different operations.
  317. Returns:
  318. A list of _DAGOperationGraphNodes to be placed into the corresponding
  319. execution schedules.
  320. """
  321. top_priority_node = None
  322. for candidates in actor_to_candidates.values():
  323. if len(candidates) == 0:
  324. continue
  325. if top_priority_node is None or candidates[0] < top_priority_node:
  326. top_priority_node = candidates[0]
  327. if top_priority_node is None:
  328. return None
  329. next_nodes = [top_priority_node]
  330. # Select all the synchronous nodes in the accelerator operation.
  331. if len(top_priority_node.sync_idxs) != 0:
  332. for task_idx, op_type in top_priority_node.sync_idxs:
  333. node = graph[task_idx][op_type]
  334. if node != top_priority_node:
  335. next_nodes.append(node)
  336. # Remove the selected nodes from the candidates.
  337. for node in next_nodes:
  338. candidates = actor_to_candidates[node.actor_handle._actor_id]
  339. candidates.remove(node)
  340. heapq.heapify(candidates)
  341. # Remove the selected nodes from the candidates.
  342. for node in next_nodes:
  343. candidates = actor_to_candidates[node.actor_handle._actor_id]
  344. # The accelerator read nodes are not added to the candidates.
  345. if node in candidates:
  346. candidates.remove(node)
  347. heapq.heapify(candidates)
  348. return next_nodes
  349. def _build_dag_node_operation_graph(
  350. idx_to_task: Dict[int, "ray.dag.compiled_dag_node.CompiledTask"],
  351. actor_to_operation_nodes: Dict[
  352. "ray.actor.ActorHandle", List[List[_DAGOperationGraphNode]]
  353. ],
  354. ) -> Dict[int, Dict[_DAGNodeOperationType, _DAGOperationGraphNode]]:
  355. """
  356. Generate a DAG node operation graph by adding edges based on the
  357. following rules:
  358. #1 Add edges from READ to COMPUTE, and from COMPUTE to WRITE, which
  359. belong to the same task.
  360. #2 Add an edge from COMPUTE with bind_index i to COMPUTE with bind_index
  361. i+1 if they belong to the same actor.
  362. #3 Add an edge from WRITE of the writer task to READ of the reader task.
  363. This is the step one of building an execution schedule for each actor.
  364. Args:
  365. idx_to_task: A dictionary that maps the `task_idx` to the `CompiledTask`.
  366. `CompiledTask` contains information about a DAGNode and its downstream
  367. nodes.
  368. actor_to_operation_nodes: A dictionary that maps an actor handle to
  369. a list of lists of _DAGOperationGraphNode. For the same actor, the
  370. index of the outer list corresponds to the index of the ExecutableTask
  371. in the list of `executable_tasks` in `actor_to_executable_tasks`. In
  372. the inner list, the order of operations is READ, COMPUTE, and WRITE.
  373. Returns:
  374. A graph where each node is a _DAGOperationGraphNode. The key is `task_idx`,
  375. the index to retrieve its task from `idx_to_task`, and the value is a
  376. dictionary that maps the _DAGNodeOperationType (READ, COMPUTE, or WRITE)
  377. to the corresponding _DAGOperationGraphNode
  378. """
  379. assert idx_to_task
  380. graph: Dict[int, Dict[_DAGNodeOperationType, _DAGOperationGraphNode]] = {}
  381. for _, operation_nodes_list in actor_to_operation_nodes.items():
  382. prev_compute_node = None
  383. for operation_nodes in operation_nodes_list:
  384. task_idx = operation_nodes[0].task_idx
  385. read_node, compute_node, write_node = (
  386. operation_nodes[0],
  387. operation_nodes[1],
  388. operation_nodes[2],
  389. )
  390. # Add edges from READ to COMPUTE, and from COMPUTE to WRITE, which
  391. # belong to the same task.
  392. _add_edge(read_node, compute_node)
  393. _add_edge(compute_node, write_node)
  394. # Add an edge from COMPUTE with `bind_index` i to COMPUTE with
  395. # `bind_index` i+1 if they belong to the same actor.
  396. if prev_compute_node is not None:
  397. _add_edge(prev_compute_node, compute_node, "", True)
  398. prev_compute_node = compute_node
  399. assert task_idx not in graph
  400. graph[task_idx] = {
  401. _DAGNodeOperationType.READ: read_node,
  402. _DAGNodeOperationType.COMPUTE: compute_node,
  403. _DAGNodeOperationType.WRITE: write_node,
  404. }
  405. # Import `ray.dag` here to avoid circular import.
  406. from ray.dag import ClassMethodNode, CollectiveOutputNode, MultiOutputNode
  407. from ray.dag.collective_node import _CollectiveOperation
  408. # Add an edge from WRITE of the writer task to READ of the reader task.
  409. # Set synchronous nodes for accelerator P2P operations.
  410. for task_idx, task in idx_to_task.items():
  411. if not (
  412. isinstance(task.dag_node, ClassMethodNode)
  413. or isinstance(task.dag_node, CollectiveOutputNode)
  414. ):
  415. # The graph is used to generate an execution schedule for each actor.
  416. # The edge from the InputNode has no impact on the final execution
  417. # schedule.
  418. continue
  419. if (
  420. isinstance(task.dag_node, ClassMethodNode)
  421. and task.dag_node.is_class_method_output
  422. ):
  423. # Class method output node dependencies are handled at its upstream:
  424. # i.e., class method node
  425. continue
  426. for downstream_task_idx in task.downstream_task_idxs:
  427. downstream_dag_node = idx_to_task[downstream_task_idx].dag_node
  428. if isinstance(downstream_dag_node, MultiOutputNode):
  429. continue
  430. write_node = graph[task_idx][_DAGNodeOperationType.WRITE]
  431. if (
  432. isinstance(downstream_dag_node, ClassMethodNode)
  433. and downstream_dag_node.is_class_method_output
  434. ):
  435. consumer_idxs = idx_to_task[downstream_task_idx].downstream_task_idxs
  436. for consumer_idx in consumer_idxs:
  437. if consumer_idx in graph:
  438. read_node = graph[consumer_idx][_DAGNodeOperationType.READ]
  439. _add_edge(
  440. write_node,
  441. read_node,
  442. "accelerator" if write_node.requires_accelerator else "shm",
  443. )
  444. if write_node.requires_accelerator:
  445. idxs = {
  446. (task_idx, _DAGNodeOperationType.WRITE),
  447. (consumer_idx, _DAGNodeOperationType.READ),
  448. }
  449. for node in [write_node, read_node]:
  450. node.sync_idxs.update(idxs)
  451. continue
  452. read_node = graph[downstream_task_idx][_DAGNodeOperationType.READ]
  453. _add_edge(
  454. write_node,
  455. read_node,
  456. "accelerator" if write_node.requires_accelerator else "shm",
  457. )
  458. if write_node.requires_accelerator:
  459. idxs = {
  460. (task_idx, _DAGNodeOperationType.WRITE),
  461. (downstream_task_idx, _DAGNodeOperationType.READ),
  462. }
  463. for node in [write_node, read_node]:
  464. node.sync_idxs.update(idxs)
  465. # Set synchronous nodes for accelerator collective operations.
  466. collective_op_to_idxs: Dict[
  467. _CollectiveOperation, Set[Tuple[int, _DAGNodeOperationType]]
  468. ] = defaultdict(set)
  469. for task_idx, task in idx_to_task.items():
  470. if (
  471. isinstance(task.dag_node, CollectiveOutputNode)
  472. and not task.dag_node.is_class_method_output
  473. ):
  474. collective_op_to_idxs[task.dag_node.collective_op].add(
  475. (task_idx, _DAGNodeOperationType.COMPUTE)
  476. )
  477. for idxs in collective_op_to_idxs.values():
  478. for task_idx, op_type in idxs:
  479. graph[task_idx][op_type].sync_idxs = idxs
  480. return graph
  481. def _actor_viz_label(actor: "ray.actor.ActorHandle"):
  482. """
  483. Returns the label of an actor in the visualization of the execution schedule.
  484. Args:
  485. actor: The actor to be represented.
  486. """
  487. class_name = actor._ray_actor_creation_function_descriptor.class_name
  488. actor_id = actor._ray_actor_id.hex()
  489. return f"Actor class name: {class_name}\nActor ID: {actor_id}"
  490. def _node_viz_id_and_label(
  491. node: _DAGOperationGraphNode, idx: int, optimized_index: int
  492. ):
  493. """
  494. Returns the visualization id and label of a node. The visualization id is unique
  495. across all nodes.
  496. Args:
  497. node: The node to be represented.
  498. idx: The index of the node in the execution schedule.
  499. optimized_index: The index of the node in the optimized execution schedule.
  500. """
  501. node_viz_label = node.viz_str() + f" {idx},{optimized_index}"
  502. node_viz_id = f"{node._actor_id}_{node_viz_label}"
  503. return node_viz_id, node_viz_label
  504. def _visualize_execution_schedule(
  505. actor_to_execution_schedule: Dict[
  506. "ray.actor.ActorHandle", List[_DAGOperationGraphNode]
  507. ],
  508. actor_to_overlapped_schedule: Optional[
  509. Dict["ray.actor.ActorHandle", List[_DAGOperationGraphNode]]
  510. ],
  511. graph: Dict[int, Dict[_DAGNodeOperationType, _DAGOperationGraphNode]],
  512. ):
  513. """
  514. Visualize the execution schedule for each actor.
  515. The visualization will be saved as a PNG file named `compiled_graph_schedule.png`.
  516. Details of the visualization: # noqa
  517. Node description format:
  518. [<task_index>] <method_name> <operation> <orig_index>, <overlap_index>
  519. Node description fields:
  520. operation: is R(READ), C(COMPUTE), or W(WRITE)
  521. orig_index: the index in the original execution schedule
  522. overlap_index: the index in the overlap-communication optimized execution schedule
  523. If this is different from orig_index, the node is highlighted in red color
  524. Node grouping:
  525. The nodes belonging to the same actor are grouped in the same rectangle
  526. The actor class name and the actor id are shown in the rectangle
  527. Edges:
  528. black color (without label): data dependency
  529. black color (annotated with "shm"): shared memory channel
  530. blue color (annotated with "accelerator): accelerator channel
  531. dashed edge: control dependency between compute operations
  532. Args:
  533. actor_to_execution_schedule: A dictionary that maps an actor handle to
  534. the execution schedule which is a list of operation nodes.
  535. actor_to_overlapped_schedule: A dictionary that maps an actor handle to the
  536. optimized execution schedule which is a list of operation nodes.
  537. graph: A graph where each node is a _DAGOperationGraphNode. The key is
  538. `task_idx`, the index to retrieve its task from `idx_to_task`, and
  539. the value is a dictionary that maps the _DAGNodeOperationType (READ,
  540. COMPUTE, or WRITE) to the corresponding _DAGOperationGraphNode. It is
  541. generated by `_build_dag_node_operation_graph`.
  542. """
  543. try:
  544. import graphviz
  545. except ImportError:
  546. raise ImportError(
  547. "Please install graphviz to visualize the execution schedule. "
  548. "You can install it by running `pip install graphviz`."
  549. )
  550. dot = graphviz.Digraph(comment="DAG")
  551. # A dictionary that maps a node to its visualization id
  552. node_to_viz_id: Dict[_DAGOperationGraphNode, str] = {}
  553. if actor_to_overlapped_schedule is None:
  554. # TODO(rui): make the visualization more concise by only displaying
  555. # the original schedule
  556. actor_to_overlapped_schedule = actor_to_execution_schedule
  557. for actor, execution_nodes in actor_to_execution_schedule.items():
  558. overlapped_schedule = actor_to_overlapped_schedule[actor]
  559. node_to_optimized_index = {
  560. node: i for i, node in enumerate(overlapped_schedule)
  561. }
  562. actor_id = actor._ray_actor_id.hex()
  563. with dot.subgraph(name=f"cluster_{actor_id}") as subgraph:
  564. subgraph.attr(rank=actor_id, label=_actor_viz_label(actor))
  565. for i, node in enumerate(execution_nodes):
  566. optimized_index = node_to_optimized_index.get(node)
  567. node_viz_id, node_viz_label = _node_viz_id_and_label(
  568. node, i, optimized_index
  569. )
  570. color = "red" if optimized_index != i else "black"
  571. subgraph.node(node_viz_id, node_viz_label, color=color)
  572. node_to_viz_id[node] = node_viz_id
  573. for actor, execution_nodes in actor_to_execution_schedule.items():
  574. for i, node in enumerate(execution_nodes):
  575. node_viz_id = node_to_viz_id[node]
  576. for out_edge, viz_info in node.out_edges.items():
  577. label, control_dependency = viz_info
  578. out_task_idx, out_op_type = out_edge
  579. out_node = graph[out_task_idx][out_op_type]
  580. out_node_viz_id = node_to_viz_id[out_node]
  581. color = "blue" if label == "accelerator" else "black"
  582. style = "dashed" if control_dependency else "solid"
  583. dot.edge(
  584. node_viz_id, out_node_viz_id, label=label, color=color, style=style
  585. )
  586. # Add legend
  587. with dot.subgraph(name="cluster_legend") as legend:
  588. legend.attr(label="Legend", labelloc="t", fontsize="20", bgcolor="lightgrey")
  589. # Single node and its explanation
  590. legend.node("example_node", "[0] bwd C 10,10\n")
  591. explanation = (
  592. '<<TABLE BORDER="0" CELLBORDER="0" CELLSPACING="0">' # noqa
  593. '<TR><TD ALIGN="LEFT"><B>Node description format:</B></TD></TR>'
  594. '<TR><TD ALIGN="LEFT">[&lt;task_index&gt;] &lt;method_name&gt; &lt;operation&gt; &lt;orig_index&gt;, &lt;overlap_index&gt;</TD></TR>' # noqa
  595. "<TR><TD></TD></TR>"
  596. '<TR><TD ALIGN="LEFT"><B>Node description fields:</B></TD></TR>'
  597. '<TR><TD ALIGN="LEFT">operation: is R(READ), C(COMPUTE), or W(WRITE)</TD></TR>' # noqa
  598. '<TR><TD ALIGN="LEFT">orig_index: the index in the original execution schedule</TD></TR>' # noqa
  599. '<TR><TD ALIGN="LEFT">overlap_index: the index in the overlap-communication optimized execution schedule</TD></TR>' # noqa
  600. '<TR><TD ALIGN="LEFT">If this is different from orig_index, the node is highlighted in <FONT COLOR="red">red color</FONT></TD></TR>' # noqa
  601. "<TR><TD></TD></TR>"
  602. '<TR><TD ALIGN="LEFT"><B>Node grouping:</B></TD></TR>'
  603. '<TR><TD ALIGN="LEFT">The nodes belonging to the same actor are grouped in the same rectangle</TD></TR>' # noqa
  604. '<TR><TD ALIGN="LEFT">The actor class name and the actor id are shown in the rectangle</TD></TR>' # noqa
  605. "<TR><TD></TD></TR>"
  606. '<TR><TD ALIGN="LEFT"><B>Edges:</B></TD></TR>'
  607. '<TR><TD ALIGN="LEFT">black color (without label): data dependency</TD></TR>' # noqa
  608. '<TR><TD ALIGN="LEFT">black color (annotated with "shm"): shared memory channel</TD></TR>' # noqa
  609. '<TR><TD ALIGN="LEFT"><FONT COLOR="blue">blue color</FONT> (annotated with "accelerator): accelerator channel</TD></TR>' # noqa
  610. '<TR><TD ALIGN="LEFT">dashed edge: control dependency between compute operations</TD></TR>' # noqa
  611. "</TABLE>>"
  612. )
  613. legend.node("example_explanation", explanation, shape="plaintext")
  614. legend.edge("example_node", "example_explanation", style="invis")
  615. logger.info(
  616. "Writing compiled graph schedule visualization "
  617. "to compiled_graph_schedule.png"
  618. )
  619. dot.render("compiled_graph_schedule", format="png", view=False)
  620. def _generate_actor_to_execution_schedule(
  621. graph: Dict[int, Dict[_DAGNodeOperationType, _DAGOperationGraphNode]],
  622. ) -> Dict["ray.actor.ActorHandle", List[_DAGOperationGraphNode]]:
  623. """
  624. Generate an execution schedule for each actor. The schedule is a list of
  625. operation nodes to be executed. The function uses a topological sort
  626. algorithm to generate the schedule.
  627. Args:
  628. graph: A graph where each node is a _DAGOperationGraphNode. The key is
  629. `task_idx`, the index to retrieve its task from `idx_to_task`, and
  630. the value is a dictionary that maps the _DAGNodeOperationType (READ,
  631. COMPUTE, or WRITE) to the corresponding _DAGOperationGraphNode. It is
  632. generated by `_build_dag_node_operation_graph`.
  633. Returns:
  634. actor_to_execution_schedule: A dictionary that maps an actor handle to
  635. the execution schedule which is a list of operation nodes to be
  636. executed.
  637. """
  638. # Mapping from the actor handle to the execution schedule which is a list
  639. # of operations to be executed.
  640. actor_to_execution_schedule: Dict[
  641. "ray.actor.ActorHandle", List[_DAGOperationGraphNode]
  642. ] = defaultdict(list)
  643. # A dictionary mapping an actor id to a list of candidate nodes. The list
  644. # is maintained as a priority queue, so the head of the queue, i.e.,
  645. # `candidates[0]`, is the node with the smallest `bind_index`.
  646. actor_to_candidates: Dict[
  647. "ray._raylet.ActorID", List[_DAGOperationGraphNode]
  648. ] = defaultdict(list)
  649. for _, node_dict in graph.items():
  650. for _, node in node_dict.items():
  651. # A node with a zero in-degree edge means all of its dependencies
  652. # have been satisfied, including both data and control dependencies.
  653. # Therefore, it is a candidate for execution.
  654. if node.in_degree == 0:
  655. _push_candidate_node_if_ready(actor_to_candidates, graph, node)
  656. visited_nodes = set()
  657. # Use topological sort algorithm to generate the execution schedule.
  658. while True:
  659. # Select a list of nodes to be executed. There are three cases:
  660. # 1. If a selected node is not an accelerator operation, only itself is returned.
  661. # 2. If a selected node is an accelerator write operation, the corresponding accelerator
  662. # read operations are also returned.
  663. # 3. If a selected node is an accelerator collective operation, all the nodes in
  664. # its collective operation are returned.
  665. nodes = _select_next_nodes(actor_to_candidates, graph)
  666. if nodes is None:
  667. break
  668. # Add the selected nodes to the execution schedule.
  669. for node in nodes:
  670. assert node not in visited_nodes
  671. visited_nodes.add(node)
  672. actor_to_execution_schedule[node.actor_handle].append(node)
  673. # Update the in-degree of the downstream nodes.
  674. for node in nodes:
  675. for out_node_task_idx, out_node_type in node.out_edges:
  676. out_node = graph[out_node_task_idx][out_node_type]
  677. if out_node in visited_nodes:
  678. # If the downstream node is already visited, it has been added
  679. # to the execution schedule. They are the accelerator read nodes in
  680. # case 2.
  681. continue
  682. out_node.in_edges.pop((node.task_idx, node.operation.type))
  683. if out_node.in_degree == 0:
  684. _push_candidate_node_if_ready(actor_to_candidates, graph, out_node)
  685. assert len(visited_nodes) == len(graph) * 3, "Expected all nodes to be visited"
  686. return actor_to_execution_schedule
  687. def _generate_overlapped_execution_schedule(
  688. actor_to_execution_schedule: Dict[
  689. "ray.actor.ActorHandle", List[_DAGOperationGraphNode]
  690. ],
  691. ) -> Dict["ray.actor.ActorHandle", List[_DAGOperationGraphNode]]:
  692. """
  693. From an existing execution schedule, generate a new schedule by overlapping
  694. computation and communication.
  695. Currently, the algorithm generates a new schedule for each actor as follows:
  696. For each accelerator read operation (i.e., recv), scan backwards to find the nearest
  697. compute node to swap with so that the accelerator read operation can be overlapped
  698. with computation.
  699. Collective operations are not yet supported.
  700. Args:
  701. actor_to_execution_schedule: A dictionary that maps an actor handle to
  702. the existing execution schedule for the actor. The schedule is a list
  703. is a list of operations to be executed.
  704. Returns:
  705. A dictionary that maps an actor handle to the overlapped execution schedule
  706. for the actor.
  707. """
  708. actor_to_overlapped_schedule: Dict[
  709. "ray.actor.ActorHandle", List[_DAGOperationGraphNode]
  710. ] = copy.deepcopy(actor_to_execution_schedule)
  711. for overlapped_schedule in actor_to_overlapped_schedule.values():
  712. for i in range(len(overlapped_schedule)):
  713. if (
  714. overlapped_schedule[i].operation.type == _DAGNodeOperationType.READ
  715. and overlapped_schedule[i].requires_accelerator
  716. ):
  717. # For each accelerator read operation (i.e., recv), scan backwards
  718. # to find the nearest compute node to swap with so that
  719. # the accelerator read operation can be overlapped with computation.
  720. for j in range(i - 1, -1, -1):
  721. if (
  722. overlapped_schedule[j].operation.type
  723. == _DAGNodeOperationType.COMPUTE
  724. ):
  725. # Found a desired compute operation, make the swap
  726. accelerator_read_op = overlapped_schedule[i]
  727. prev_ops = overlapped_schedule[j:i]
  728. overlapped_schedule[j + 1 : i + 1] = prev_ops
  729. overlapped_schedule[j] = accelerator_read_op
  730. break
  731. if (
  732. overlapped_schedule[j].operation.type
  733. == _DAGNodeOperationType.READ
  734. or overlapped_schedule[j].operation.type
  735. == _DAGNodeOperationType.WRITE
  736. ) and overlapped_schedule[j].requires_accelerator:
  737. # Found an accelerator read/write operation, skip the overlap
  738. # optimization to keep relative order of accelerator operations
  739. break
  740. return actor_to_overlapped_schedule
  741. def _extract_execution_schedule(
  742. actor_to_execution_schedule: Dict[
  743. "ray.actor.ActorHandle", List[_DAGOperationGraphNode]
  744. ],
  745. ) -> Dict["ray.actor.ActorHandle", List[_DAGNodeOperation]]:
  746. """
  747. Extract _DAGNodeOperation from _DAGOperationGraphNode in the schedule
  748. and discard unnecessary information.
  749. """
  750. return {
  751. actor: [node.operation for node in nodes]
  752. for actor, nodes in actor_to_execution_schedule.items()
  753. }