event_system.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from enum import Enum, auto
  2. from typing import Any, Callable, Dict, List, Optional, Union
  3. from ray.autoscaler._private.cli_logger import cli_logger
  4. class CreateClusterEvent(Enum):
  5. """Events to track in ray.autoscaler.sdk.create_or_update_cluster.
  6. Attributes:
  7. up_started : Invoked at the beginning of create_or_update_cluster.
  8. ssh_keypair_downloaded : Invoked when the ssh keypair is downloaded.
  9. cluster_booting_started : Invoked when when the cluster booting starts.
  10. acquiring_new_head_node : Invoked before the head node is acquired.
  11. head_node_acquired : Invoked after the head node is acquired.
  12. ssh_control_acquired : Invoked when the node is being updated.
  13. run_initialization_cmd : Invoked before all initialization
  14. commands are called and again before each initialization command.
  15. run_setup_cmd : Invoked before all setup commands are
  16. called and again before each setup command.
  17. start_ray_runtime : Invoked before ray start commands are run.
  18. start_ray_runtime_completed : Invoked after ray start commands
  19. are run.
  20. cluster_booting_completed : Invoked after cluster booting
  21. is completed.
  22. """
  23. up_started = auto()
  24. ssh_keypair_downloaded = auto()
  25. cluster_booting_started = auto()
  26. acquiring_new_head_node = auto()
  27. head_node_acquired = auto()
  28. ssh_control_acquired = auto()
  29. run_initialization_cmd = auto()
  30. run_setup_cmd = auto()
  31. start_ray_runtime = auto()
  32. start_ray_runtime_completed = auto()
  33. cluster_booting_completed = auto()
  34. class _EventSystem:
  35. """Event system that handles storing and calling callbacks for events.
  36. Attributes:
  37. callback_map (Dict[str, List[Callable]]) : Stores list of callbacks
  38. for events when registered.
  39. """
  40. def __init__(self):
  41. self.callback_map = {}
  42. def add_callback_handler(
  43. self,
  44. event: str,
  45. callback: Union[Callable[[Dict], None], List[Callable[[Dict], None]]],
  46. ):
  47. """Stores callback handler for event.
  48. Args:
  49. event: Event that callback should be called on. See
  50. CreateClusterEvent for details on the events available to be
  51. registered against.
  52. callback (Callable[[Dict], None]): Callable object that is invoked
  53. when specified event occurs.
  54. """
  55. if event not in CreateClusterEvent.__members__.values():
  56. cli_logger.warning(
  57. f"{event} is not currently tracked, and this"
  58. " callback will not be invoked."
  59. )
  60. self.callback_map.setdefault(event, []).extend(
  61. [callback] if type(callback) is not list else callback
  62. )
  63. def execute_callback(
  64. self, event: CreateClusterEvent, event_data: Optional[Dict[str, Any]] = None
  65. ):
  66. """Executes all callbacks for event.
  67. Args:
  68. event: Event that is invoked. See CreateClusterEvent
  69. for details on the available events.
  70. event_data (Dict[str, Any]): Argument that is passed to each
  71. callable object stored for this particular event.
  72. """
  73. if event_data is None:
  74. event_data = {}
  75. event_data["event_name"] = event
  76. if event in self.callback_map:
  77. for callback in self.callback_map[event]:
  78. callback(event_data)
  79. def clear_callbacks_for_event(self, event: str):
  80. """Clears stored callable objects for event.
  81. Args:
  82. event: Event that has callable objects stored in map.
  83. See CreateClusterEvent for details on the available events.
  84. """
  85. if event in self.callback_map:
  86. del self.callback_map[event]
  87. global_event_system = _EventSystem()