_utils.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. from __future__ import annotations
  2. from collections.abc import Collection
  3. from typing import Annotated, Any, Final, Optional, Protocol, TypedDict, Union
  4. from pydantic import Field
  5. from typing_extensions import Self, Unpack
  6. from wandb._pydantic import GQLId, GQLInput, computed_field, model_validator, to_json
  7. from ._filters import MongoLikeFilter
  8. from ._generated import (
  9. CreateFilterTriggerInput,
  10. QueueJobActionInput,
  11. TriggeredActionConfig,
  12. UpdateFilterTriggerInput,
  13. )
  14. from ._validators import parse_input_action
  15. from .actions import (
  16. ActionType,
  17. DoNothing,
  18. InputAction,
  19. SavedAction,
  20. SendNotification,
  21. SendWebhook,
  22. )
  23. from .automations import Automation, NewAutomation
  24. from .events import (
  25. EventType,
  26. InputEvent,
  27. RunMetricFilter,
  28. SavedEvent,
  29. _WrappedSavedEventFilter,
  30. )
  31. from .scopes import AutomationScope, ScopeType
  32. INVALID_INPUT_EVENTS: Final[Collection[EventType]] = (EventType.UPDATE_ARTIFACT_ALIAS,)
  33. """Event types that should NOT be allowed as new values on new or edited automations.
  34. While we forbid new/edited automations from assigning these event types,
  35. they're defined so that we can still parse existing automations that may use them.
  36. """
  37. INVALID_INPUT_ACTIONS: Final[Collection[ActionType]] = (ActionType.QUEUE_JOB,)
  38. """Action types that should NOT be allowed as new values on new or edited automations.
  39. While we forbid new/edited automations from assigning these action types,
  40. they're defined so that we can still parse existing automations that may use them.
  41. """
  42. ALWAYS_SUPPORTED_EVENTS: Final[Collection[EventType]] = frozenset(
  43. {
  44. EventType.CREATE_ARTIFACT,
  45. EventType.LINK_ARTIFACT,
  46. EventType.ADD_ARTIFACT_ALIAS,
  47. }
  48. )
  49. """Event types that should be supported by all current, non-EOL server versions."""
  50. ALWAYS_SUPPORTED_ACTIONS: Final[Collection[ActionType]] = frozenset(
  51. {
  52. ActionType.NOTIFICATION,
  53. ActionType.GENERIC_WEBHOOK,
  54. }
  55. )
  56. """Action types that should be supported by all current, non-EOL server versions."""
  57. class HasId(Protocol):
  58. id: str
  59. def extract_id(obj: HasId | str) -> str:
  60. return obj.id if hasattr(obj, "id") else obj
  61. # ---------------------------------------------------------------------------
  62. ACTION_CONFIG_KEYS: dict[ActionType, str] = {
  63. ActionType.NOTIFICATION: "notification_action_input",
  64. ActionType.GENERIC_WEBHOOK: "generic_webhook_action_input",
  65. ActionType.NO_OP: "no_op_action_input",
  66. ActionType.QUEUE_JOB: "queue_job_action_input",
  67. }
  68. class InputActionConfig(TriggeredActionConfig):
  69. """Prepares action configuration data for saving an automation."""
  70. # NOTE: `QueueJobActionInput` for defining a Launch job is deprecated,
  71. # so while it's allowed here to update EXISTING mutations, we don't
  72. # currently expose it through the public API to create NEW automations.
  73. queue_job_action_input: Optional[QueueJobActionInput] = None
  74. notification_action_input: Optional[SendNotification] = None
  75. generic_webhook_action_input: Optional[SendWebhook] = None
  76. no_op_action_input: Optional[DoNothing] = None
  77. def prepare_action_config_input(obj: SavedAction | InputAction) -> dict[str, Any]:
  78. """Nests the action input under the correct key for `TriggeredActionConfig`.
  79. This is necessary to conform to the schemas for:
  80. - `CreateFilterTriggerInput`
  81. - `UpdateFilterTriggerInput`
  82. """
  83. # Delegate to inner validators to convert SavedAction -> InputAction types, if needed.
  84. obj = parse_input_action(obj)
  85. return InputActionConfig(**{ACTION_CONFIG_KEYS[obj.action_type]: obj}).model_dump()
  86. def prepare_event_filter_input(
  87. obj: _WrappedSavedEventFilter | MongoLikeFilter | RunMetricFilter,
  88. ) -> str:
  89. """Unnests (if needed) and serializes an `EventFilter` input to JSON.
  90. This is necessary to conform to the schemas for:
  91. - `CreateFilterTriggerInput`
  92. - `UpdateFilterTriggerInput`
  93. """
  94. # Input event filters are nested one level deeper than saved event filters.
  95. # Note that this is NOT the case for run/run metric filters.
  96. #
  97. # Yes, this is confusing. It's also necessary to conform to under-the-hood
  98. # schemas and logic in the backend.
  99. if isinstance(obj, _WrappedSavedEventFilter):
  100. return to_json(obj.filter)
  101. return to_json(obj)
  102. class WriteAutomationsKwargs(TypedDict, total=False):
  103. """Keyword arguments that can be passed to create or update an automation."""
  104. name: str
  105. description: str
  106. enabled: bool
  107. scope: AutomationScope
  108. event: InputEvent
  109. action: InputAction
  110. class ValidatedCreateInput(GQLInput, extra="forbid", frozen=True):
  111. """Validated automation parameters, prepared for creating a new automation.
  112. Note: Users should never need to instantiate this class directly.
  113. """
  114. name: str
  115. description: Optional[str] = None
  116. enabled: bool = True
  117. # ------------------------------------------------------------------------------
  118. # Set on instantiation, but used to derive other fields and deliberately
  119. # EXCLUDED from the final GraphQL request vars
  120. event: Annotated[InputEvent, Field(exclude=True)]
  121. action: Annotated[InputAction, Field(exclude=True)]
  122. # ------------------------------------------------------------------------------
  123. # Derived fields to match the input schemas
  124. @computed_field
  125. def scope_type(self) -> ScopeType:
  126. return self.event.scope.scope_type
  127. @computed_field
  128. def scope_id(self) -> GQLId:
  129. return self.event.scope.id
  130. @computed_field
  131. def triggering_event_type(self) -> EventType:
  132. return self.event.event_type
  133. @computed_field
  134. def event_filter(self) -> str:
  135. return prepare_event_filter_input(self.event.filter)
  136. @computed_field
  137. def triggered_action_type(self) -> ActionType:
  138. return self.action.action_type
  139. @computed_field
  140. def triggered_action_config(self) -> dict[str, Any]:
  141. return prepare_action_config_input(self.action)
  142. # ------------------------------------------------------------------------------
  143. # Custom validation
  144. @model_validator(mode="after")
  145. def _forbid_legacy_event_types(self) -> Self:
  146. if (type_ := self.event.event_type) in INVALID_INPUT_EVENTS:
  147. raise ValueError(f"{type_!r} events cannot be assigned to automations.")
  148. return self
  149. @model_validator(mode="after")
  150. def _forbid_legacy_action_types(self) -> Self:
  151. if (type_ := self.action.action_type) in INVALID_INPUT_ACTIONS:
  152. raise ValueError(f"{type_!r} actions cannot be assigned to automations.")
  153. return self
  154. class ValidatedUpdateInput(GQLInput, extra="ignore", frozen=True):
  155. """Validated automation parameters, prepared for updating an existing automation.
  156. Accepts both InputEvent/InputAction (user-supplied for the update) and
  157. SavedEvent/SavedAction (carried over from the existing saved automation).
  158. This avoids the coercion bug where routing through Automation(event: SavedEvent)
  159. silently drops InputEvent filters.
  160. Uses extra="ignore" (rather than "forbid") because dict(Automation) includes
  161. fields like typename__, created_at, updated_at that are not relevant for the
  162. update payload.
  163. """
  164. id: GQLId
  165. name: Optional[str] = None
  166. description: Optional[str] = None
  167. enabled: Optional[bool] = None
  168. event: Annotated[Union[InputEvent, SavedEvent], Field(exclude=True)]
  169. action: Annotated[Union[InputAction, SavedAction], Field(exclude=True)]
  170. scope: Annotated[AutomationScope, Field(exclude=True)]
  171. # --------------------------------------------------------------------------
  172. # Derived fields to match the input schemas
  173. @computed_field
  174. def scope_type(self) -> ScopeType:
  175. return self.scope.scope_type
  176. @computed_field
  177. def scope_id(self) -> GQLId:
  178. return self.scope.id
  179. @computed_field
  180. def triggering_event_type(self) -> EventType:
  181. return self.event.event_type
  182. @computed_field
  183. def event_filter(self) -> str:
  184. return prepare_event_filter_input(self.event.filter)
  185. @computed_field
  186. def triggered_action_type(self) -> ActionType:
  187. return self.action.action_type
  188. @computed_field
  189. def triggered_action_config(self) -> dict[str, Any]:
  190. return prepare_action_config_input(self.action)
  191. # --------------------------------------------------------------------------
  192. # Custom validation
  193. @model_validator(mode="after")
  194. def _forbid_legacy_event_types(self) -> Self:
  195. if (type_ := self.event.event_type) in INVALID_INPUT_EVENTS:
  196. raise ValueError(f"{type_!r} events cannot be assigned to automations.")
  197. return self
  198. @model_validator(mode="after")
  199. def _forbid_legacy_action_types(self) -> Self:
  200. if (type_ := self.action.action_type) in INVALID_INPUT_ACTIONS:
  201. raise ValueError(f"{type_!r} actions cannot be assigned to automations.")
  202. return self
  203. def prepare_to_create(
  204. obj: NewAutomation | None = None,
  205. /,
  206. **kwargs: Unpack[WriteAutomationsKwargs],
  207. ) -> CreateFilterTriggerInput:
  208. """Prepares the payload to create an automation in a GraphQL request."""
  209. # Validate all input variables, and prepare as expected by the GraphQL request.
  210. # - if an object is provided, override its fields with any keyword args
  211. # - otherwise, instantiate from the keyword args
  212. obj_dict = {**obj.model_dump(), **kwargs} if obj else kwargs # type: ignore[typeddict-item]
  213. vobj = ValidatedCreateInput(**obj_dict)
  214. return CreateFilterTriggerInput.model_validate(vobj)
  215. def prepare_to_update(
  216. obj: Automation | None = None,
  217. /,
  218. **kwargs: Unpack[WriteAutomationsKwargs],
  219. ) -> UpdateFilterTriggerInput:
  220. """Prepares the payload to update an automation in a GraphQL request."""
  221. # Validate all input variables, and prepare as expected by the GraphQL request.
  222. # - if an object is provided, override its fields with any keyword args
  223. # - otherwise, instantiate from the keyword args
  224. obj_dict = dict(obj or {}) | kwargs
  225. vobj = ValidatedUpdateInput(**obj_dict)
  226. return UpdateFilterTriggerInput.model_validate(vobj)