normalize.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # mypy: allow-untyped-defs
  2. import operator
  3. from collections.abc import Callable
  4. from typing import Any, Optional
  5. import torch
  6. import torch.fx
  7. import torch.fx as fx
  8. from torch.fx import Proxy, Transformer
  9. from torch.fx.node import Argument, map_aggregate, Node, Target
  10. from torch.fx.operator_schemas import (
  11. create_type_hint,
  12. normalize_function,
  13. normalize_module,
  14. )
  15. from .schema_type_annotation import AnnotateTypesWithSchema
  16. class NormalizeArgs(Transformer):
  17. """
  18. Normalize arguments to Python targets. This means that
  19. `args/kwargs` will be matched up to the module/functional's
  20. signature and rewritten to exclusively kwargs in positional order
  21. if `normalize_to_only_use_kwargs` is true. Also populates default
  22. values. Does not support positional-only parameters or varargs
  23. parameters (*args, **kwargs).
  24. If the nodes have 'type' metadata, it will use it to disambiguate
  25. overloads. Otherwise, it will throw an error.
  26. Example usage:
  27. m = torchvision.models.resnet18()
  28. traced = torch.fx.symbolic_trace(m)
  29. traced = NormalizeArgs(traced).transform()
  30. """
  31. def __init__(
  32. self, module: torch.fx.GraphModule, normalize_to_only_use_kwargs: bool = True
  33. ):
  34. super().__init__(module)
  35. self.node_map: dict[Proxy, Node] = {}
  36. self.normalize_to_only_use_kwargs = normalize_to_only_use_kwargs
  37. def run_node(self, n: Node) -> Any:
  38. args, kwargs = self.fetch_args_kwargs_from_env(n)
  39. def get_type(arg):
  40. if isinstance(arg, fx.Node):
  41. return n.meta.get("type")
  42. return type(arg)
  43. arg_types = map_aggregate(n.args, get_type)
  44. if not isinstance(arg_types, tuple):
  45. raise AssertionError(f"Expected tuple, got {type(arg_types)}")
  46. arg_types = tuple(create_type_hint(i) for i in arg_types)
  47. kwarg_types = {k: get_type(v) for k, v in kwargs.items()}
  48. if n.op == "call_function":
  49. out = self.call_function(n.target, args, kwargs, arg_types, kwarg_types)
  50. else:
  51. out = super().run_node(n)
  52. if n.op != "output":
  53. self.node_map[out] = n
  54. out.node.meta = n.meta
  55. out.node.type = n.type
  56. return out
  57. def call_function(
  58. self,
  59. target: Target,
  60. args: tuple[Argument, ...],
  61. kwargs: dict[str, Any],
  62. arg_types: Optional[tuple[Any, ...]] = None,
  63. kwarg_types: Optional[dict[str, Any]] = None,
  64. ):
  65. if not callable(target):
  66. raise AssertionError(f"Expected callable target, got {type(target)}")
  67. new_args_and_kwargs = normalize_function(
  68. target,
  69. args, # type: ignore[arg-type]
  70. kwargs,
  71. arg_types, # type: ignore[arg-type]
  72. kwarg_types,
  73. self.normalize_to_only_use_kwargs,
  74. )
  75. if new_args_and_kwargs:
  76. new_args, new_kwargs = new_args_and_kwargs
  77. return self.tracer.create_proxy(
  78. "call_function", target, new_args, new_kwargs
  79. )
  80. else:
  81. return super().call_function(target, args, kwargs)
  82. def call_module(
  83. self, target: Target, args: tuple[Argument, ...], kwargs: dict[str, Any]
  84. ):
  85. if not isinstance(target, str):
  86. raise AssertionError(f"Expected str target, got {type(target)}")
  87. new_args_and_kwargs = normalize_module(
  88. self.module,
  89. target,
  90. args, # type: ignore[arg-type]
  91. kwargs,
  92. self.normalize_to_only_use_kwargs,
  93. )
  94. if new_args_and_kwargs:
  95. new_args, new_kwargs = new_args_and_kwargs
  96. return super().call_module(target, new_args, new_kwargs)
  97. else:
  98. return super().call_module(target, args, kwargs)
  99. class NormalizeOperators(AnnotateTypesWithSchema):
  100. """
  101. Normalize callsites that are different ways of "spelling" the same
  102. invocation into a single, canonical call. Currently supports:
  103. 1. Normalize operators (e.g. operator.add) to the `torch` ops they
  104. ultimately invoke (e.g. torch.add) when it is possible to statically
  105. reason that
  106. Example usage:
  107. m = torchvision.models.resnet18()
  108. traced = torch.fx.symbolic_trace(m)
  109. traced = NormalizeOperators(traced).transform()
  110. """
  111. binary_magic_method_remap: dict[
  112. Callable[[Any, Any], Any], Callable[[Any, Any], Any]
  113. ] = {
  114. torch.add: operator.add,
  115. torch.mul: operator.mul,
  116. torch.sub: operator.sub,
  117. torch.div: operator.truediv,
  118. torch.floor_divide: operator.floordiv,
  119. torch.remainder: operator.mod,
  120. torch.eq: operator.eq,
  121. torch.ne: operator.ne,
  122. torch.lt: operator.lt,
  123. torch.le: operator.le,
  124. torch.gt: operator.gt,
  125. torch.ge: operator.ge,
  126. }
  127. def call_function(
  128. self, target: Target, args: tuple[Argument, ...], kwargs: dict[str, Any]
  129. ):
  130. # Normalize operators according to the magic methods implemented on tensors here:
  131. # https://github.com/pytorch/pytorch/blob/28c5d90b679c6b38bf4183ec99f16d933c2f1bcd/tools/autograd/templates/python_variable_methods.cpp#L1137 # noqa: B950
  132. if not callable(target):
  133. raise AssertionError(f"Expected callable target, got {type(target)}")
  134. if target in self.binary_magic_method_remap:
  135. if len(args) != 2:
  136. return super().call_function(target, args, kwargs)
  137. lhs, rhs = args
  138. return super().call_function(
  139. target=self.binary_magic_method_remap[target],
  140. args=(lhs, rhs),
  141. kwargs={},
  142. )
  143. return super().call_function(target, args, kwargs)