| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607 |
- """Testing utilities and infrastructure for Dynamo.
- This module provides a comprehensive set of testing utilities including:
- - Test result collection and validation
- - Graph manipulation and comparison tools
- - Test case management and execution helpers
- - Specialized test decorators for different Python versions and features
- - RNG state management
- - Compilation counting and monitoring
- - Debug utilities for bytecode transformation
- The utilities in this module are used across Dynamo's test suite to ensure
- consistent testing patterns and proper test isolation.
- """
- import contextlib
- import dis
- import functools
- import logging
- import os.path
- import random
- import re
- import sys
- import types
- import unittest
- from collections.abc import Callable, Generator, Sequence
- from typing import Any, Optional, overload, TypeVar, Union
- from typing_extensions import ParamSpec
- from unittest.mock import patch
- import torch
- from torch import fx
- from torch._dynamo.backends.debugging import aot_eager
- from torch._dynamo.output_graph import OutputGraph
- from . import config, eval_frame, optimize_assert, reset
- from .bytecode_transformation import (
- create_instruction,
- debug_checks,
- is_generator,
- transform_code_object,
- )
- from .guards import CheckFunctionManager, CompileId, GuardedCode
- from .types import ConvertFrameReturn, DynamoFrameType, wrap_guarded_code
- from .utils import CompileCounterInt, same
- np: Optional[types.ModuleType] = None
- try:
- import numpy as np
- except ModuleNotFoundError:
- np = None
- unsupported = eval_frame.unsupported
- three = 3
- log = logging.getLogger(__name__)
- _P = ParamSpec("_P")
- def clone_me(x: Optional[torch.Tensor]) -> Optional[torch.Tensor]:
- if x is None:
- return None
- return x.detach().clone().requires_grad_(x.requires_grad)
- def remove_optimized_module_prefix(name: str) -> str:
- return re.sub(r"^_orig_mod[.]", "", name)
- def extract_graph_and_tracker(fn, *args, **kwargs): # type: ignore[no-untyped-def]
- from torch._dynamo.symbolic_convert import InstructionTranslator
- gm = None
- region_tracker = None
- def extract_graph_backend(_gm, *args, **kwargs): # type: ignore[no-untyped-def]
- nonlocal gm
- nonlocal region_tracker
- gm = _gm
- region_tracker = InstructionTranslator.current_tx().output.region_tracker
- return _gm
- torch.compile(backend=extract_graph_backend, fullgraph=True)(fn)(*args, **kwargs)
- return gm.graph, region_tracker # type: ignore[union-attr]
- def extract_graph(fn, *args, **kwargs): # type: ignore[no-untyped-def]
- backend = AotEagerAndRecordGraphs()
- result = torch.compile(backend=backend)(fn)(*args, **kwargs)
- return result, backend.graphs, backend.fw_graphs, backend.bw_graphs
- def collect_results(
- model: torch.nn.Module, prediction: Any, loss: Any, example_inputs: Any
- ) -> list[Any]:
- results = []
- results.append(prediction)
- results.append(loss)
- # if isinstance(loss, torch.Tensor) and loss.item() > 1:
- # log.warning(
- # f"High loss value alert - {loss:.2f}. Can result in unstable gradients."
- # )
- grads = {}
- params = {}
- for name, param in model.named_parameters():
- if isinstance(model, eval_frame.OptimizedModule):
- name = remove_optimized_module_prefix(name)
- param_copy = param
- grad = param.grad
- # Treat None and zero grad as same
- if param.grad is None:
- grad = torch.zeros_like(param)
- grads[name + ".grad"] = grad
- params[name] = param_copy
- results.append(grads)
- results.append(params)
- buffers = {}
- for name, buffer in model.named_buffers():
- if isinstance(model, eval_frame.OptimizedModule):
- name = remove_optimized_module_prefix(name)
- buffers[name] = buffer
- results.append(buffers)
- for example in example_inputs:
- if isinstance(example, (tuple, list)):
- results.extend(inp.grad for inp in example if isinstance(inp, torch.Tensor))
- else:
- if isinstance(example, torch.Tensor):
- results.append(example.grad)
- return results
- def requires_bwd_pass(out: Any) -> bool:
- if isinstance(out, torch.Tensor):
- return out.requires_grad
- elif isinstance(out, (list, tuple)):
- return any(requires_bwd_pass(x) for x in out)
- elif out is None:
- return False
- elif isinstance(out, int):
- return False
- raise NotImplementedError("Don't know how to reduce", type(out))
- @overload
- def reduce_to_scalar_loss(out: torch.Tensor) -> torch.Tensor: ...
- @overload
- def reduce_to_scalar_loss(
- out: Union[list[Any], tuple[Any, ...], dict[Any, Any]],
- ) -> float: ...
- def reduce_to_scalar_loss(out: Any) -> Union[torch.Tensor, float]:
- """Reduce the output of a model to get scalar loss"""
- if isinstance(out, torch.Tensor):
- # Mean does not work on integer tensors
- return out.sum() / out.numel()
- elif isinstance(out, (list, tuple)):
- return sum(reduce_to_scalar_loss(x) for x in out) / len(out)
- elif type(out).__name__ in (
- "MaskedLMOutput",
- "Seq2SeqLMOutput",
- "CausalLMOutputWithCrossAttentions",
- ):
- return reduce_to_scalar_loss(out.logits)
- elif type(out).__name__ == "SquashedNormal":
- return out.mean.sum()
- elif isinstance(out, dict):
- return sum(reduce_to_scalar_loss(value) for value in out.values()) / len(
- out.keys()
- )
- raise NotImplementedError("Don't know how to reduce", type(out))
- def debug_dir() -> str:
- path = os.path.join(os.path.dirname(__file__), "../debug")
- if not os.path.exists(path):
- os.mkdir(path)
- return path
- def debug_dump(name: str, code: types.CodeType, extra: str = "") -> None:
- with open(os.path.join(debug_dir(), name), "w") as fd:
- fd.write(
- f"{dis.Bytecode(code).info()}\n\n{dis.Bytecode(code).dis()}\n\n{extra}\n"
- )
- def debug_insert_nops(
- frame: DynamoFrameType, cache_size: int, hooks: Any, _: Any, *, skip: int = 0
- ) -> ConvertFrameReturn:
- """used to debug jump updates"""
- def insert_nops(instructions: list[Any], code_options: Any) -> None:
- instructions.insert(0, create_instruction("NOP"))
- instructions.insert(0, create_instruction("NOP"))
- metrics_context = torch._dynamo.utils.get_metrics_context()
- with torch._dynamo.utils.dynamo_timed("debug_insert_nops"), metrics_context:
- if is_generator(frame.f_code):
- return ConvertFrameReturn()
- debug_checks(frame.f_code)
- code, _ = transform_code_object(frame.f_code, insert_nops)
- graph = OutputGraph(
- code_options={},
- compiler_fn=None,
- root_tx=None, # type: ignore[arg-type]
- export=False,
- export_constraints=[],
- frame_state={"_id": 0},
- # TODO: shouldn't this be f_locals/f_globals from frame?
- local_scope=locals(),
- global_scope=globals(),
- f_code=frame.f_code,
- torch_function_mode_stack=[],
- package=None,
- )
- return wrap_guarded_code(
- GuardedCode(
- code,
- CheckFunctionManager(frame.f_code, graph).guard_manager, # type: ignore[arg-type]
- CompileId(frame_id=0, frame_compile_id=0),
- )
- )
- class CompileCounter:
- def __init__(self) -> None:
- self.frame_count: Union[int, CompileCounterInt] = 0
- self.clear()
- def __call__(
- self, gm: torch.fx.GraphModule, example_inputs: list[torch.Tensor]
- ) -> Callable[..., Any]:
- self.frame_count += 1
- for node in gm.graph.nodes:
- if "call" in node.op:
- self.op_count += 1
- return gm.forward
- def clear(self) -> None:
- if config.debug_disable_compile_counter:
- self.frame_count = CompileCounterInt(0)
- else:
- self.frame_count = 0
- self.op_count = 0
- class CompileCounterWithBackend:
- def __init__(self, backend: str) -> None:
- self.frame_count: Union[int, CompileCounterInt] = 0
- self.backend = backend
- self.graphs: list[torch.fx.GraphModule] = []
- self.clear()
- def __call__(
- self, gm: torch.fx.GraphModule, example_inputs: list[torch.Tensor]
- ) -> Callable[..., Any]:
- from .backends.registry import lookup_backend
- self.frame_count += 1
- for node in gm.graph.nodes:
- if "call" in node.op:
- self.op_count += 1
- self.graphs.append(gm)
- return lookup_backend(self.backend)(gm, example_inputs)
- def clear(self) -> None:
- if config.debug_disable_compile_counter:
- self.frame_count = CompileCounterInt(0)
- else:
- self.frame_count = 0
- self.op_count = 0
- self.graphs = []
- # Equivalent to backend="eager", but also records graphs that
- # we can assert on
- class EagerAndRecordGraphs:
- def __init__(self) -> None:
- self.graphs: list[torch.fx.GraphModule] = []
- def __call__(
- self, gm: torch.fx.GraphModule, example_inputs: list[torch.Tensor]
- ) -> Callable[..., Any]:
- self.graphs.append(gm)
- return gm.forward
- class AotEagerAndRecordGraphs:
- def __init__(self) -> None:
- self.graphs: list[torch.fx.GraphModule] = []
- self.fw_graphs: list[torch.fx.GraphModule] = []
- self.bw_graphs: list[torch.fx.GraphModule] = []
- def __call__(
- self, gm: torch.fx.GraphModule, example_inputs: list[torch.Tensor]
- ) -> Callable[..., Any]:
- self.graphs.append(gm)
- def fw_compiler(
- gm: torch.fx.GraphModule, example_inputs: list[torch.Tensor]
- ) -> Callable[..., Any]:
- self.fw_graphs.append(gm)
- return gm.forward
- def bw_compiler(
- gm: torch.fx.GraphModule, example_inputs: list[torch.Tensor]
- ) -> Callable[..., Any]:
- self.bw_graphs.append(gm)
- return gm.forward
- return aot_eager(
- gm,
- example_inputs,
- fw_compiler=fw_compiler,
- bw_compiler=bw_compiler,
- )
- class InductorAndRecordGraphs:
- def __init__(self) -> None:
- self.graphs: list[torch.fx.GraphModule] = []
- self.inductor_graphs: list[torch.fx.GraphModule] = []
- def __call__(self, gm, example_inputs): # type: ignore[no-untyped-def]
- import torch._inductor.compile_fx as compile_fx_mod
- self.graphs.append(gm)
- old_compile_fx_inner = compile_fx_mod._compile_fx_inner
- def patched(*args, **kwargs): # type: ignore[no-untyped-def]
- self.inductor_graphs.append(args[0])
- return old_compile_fx_inner(*args, **kwargs)
- with patch.object(compile_fx_mod, "_compile_fx_inner", new=patched):
- return compile_fx_mod.compile_fx(gm, example_inputs)
- def strip_comment(code: str) -> str:
- return re.sub(r"(?m)^ *#.*\n?", "", code)
- def remove_trailing_space(code: str) -> str:
- return "\n".join([line.rstrip() for line in code.split("\n")])
- def _squash_blank_lines(code: str) -> str:
- lines = code.split("\n")
- result: list[str] = []
- saw_blank = False
- for line in lines:
- if line.strip() == "":
- if saw_blank:
- continue
- saw_blank = True
- else:
- saw_blank = False
- result.append(line)
- return "\n".join(result)
- def normalize_gm(gm_str: str) -> str:
- # strip comments as comments have path to files which may differ from
- # system to system.
- stripped = strip_comment(gm_str)
- no_trailing = remove_trailing_space(stripped)
- return _squash_blank_lines(no_trailing)
- def empty_line_normalizer(code: str) -> str:
- """
- Normalize code: remove empty lines.
- """
- normal_code = re.sub(r"[\r\n]+", "\n", code)
- return normal_code
- def standard_test(
- self: Any,
- fn: Callable[..., Any],
- nargs: int,
- expected_ops: Optional[int] = None,
- expected_ops_dynamic: Optional[int] = None,
- expected_frame_count: int = 1,
- ) -> None:
- if not config.assume_static_by_default and expected_ops_dynamic is not None:
- expected_ops = expected_ops_dynamic
- actual = CompileCounter()
- args1 = [torch.randn(10, 10) for _ in range(nargs)]
- args2 = [torch.randn(10, 10) for _ in range(nargs)]
- correct1 = fn(*args1)
- correct2 = fn(*args2)
- reset()
- opt_fn = optimize_assert(actual)(fn)
- val1a = opt_fn(*args1)
- val2a = opt_fn(*args2)
- val1b = opt_fn(*args1)
- val2b = opt_fn(*args2)
- reset()
- self.assertTrue(same(val1a, correct1))
- self.assertTrue(same(val1b, correct1))
- self.assertTrue(same(val2a, correct2))
- self.assertTrue(same(val2b, correct2))
- self.assertEqual(actual.frame_count, expected_frame_count)
- if expected_ops is not None:
- self.assertEqual(actual.op_count, expected_ops)
- def dummy_fx_compile(
- gm: fx.GraphModule, example_inputs: list[torch.Tensor]
- ) -> Callable[..., Any]:
- return gm.forward
- def format_speedup(
- speedup: float,
- pvalue: float,
- is_correct: bool = True,
- pvalue_threshold: float = 0.1,
- ) -> str:
- if not is_correct:
- return "ERROR"
- if pvalue > pvalue_threshold:
- return f"{speedup:.3f}x SAME"
- return f"{speedup:.3f}x p={pvalue:.2f}"
- def rand_strided(
- size: Sequence[int],
- stride: Sequence[int],
- dtype: torch.dtype = torch.float32,
- device: Union[str, torch.device] = "cpu",
- extra_size: int = 0,
- ) -> torch.Tensor:
- needed_size = extra_size
- if all(s > 0 for s in size):
- # only need to allocate if all sizes are non-zero
- needed_size += (
- sum((shape - 1) * stride for shape, stride in zip(size, stride)) + 1
- )
- if dtype.is_floating_point:
- if dtype.itemsize == 1:
- """
- normal distribution kernel is not implemented for fp8..
- Workaround that by creating a fp16 tensor and then cast.
- """
- buffer = torch.randn(needed_size, dtype=torch.float16, device=device).to(
- dtype=dtype
- )
- else:
- buffer = torch.randn(needed_size, dtype=dtype, device=device)
- else:
- buffer = torch.zeros(size=[needed_size], dtype=dtype, device=device)
- return torch.as_strided(buffer, size, stride)
- _T = TypeVar("_T")
- def check_dynamic_shape_capture() -> bool:
- # This also mirrors config from `test/dynamo/test_dynamic_shapes.py:make_dynamic_cls`
- return not config.assume_static_by_default
- def _make_fn_with_patches(fn: Callable[_P, _T], *patches: Any) -> Callable[_P, _T]:
- @functools.wraps(fn)
- def _fn(*args: _P.args, **kwargs: _P.kwargs) -> _T:
- with contextlib.ExitStack() as stack:
- for module, attr, val in patches:
- stack.enter_context(patch.object(module, attr, val))
- return fn(*args, **kwargs)
- return _fn
- def make_test_cls_with_patches(
- cls: type,
- cls_prefix: str,
- fn_suffix: str,
- *patches: Any,
- xfail_prop: Optional[str] = None,
- decorator: Callable[[Callable[..., Any]], Callable[..., Any]] = lambda x: x,
- ) -> type:
- DummyTestClass = type(f"{cls_prefix}{cls.__name__}", cls.__bases__, {})
- DummyTestClass.__qualname__ = DummyTestClass.__name__
- for name in dir(cls):
- if name.startswith("test_"):
- fn = getattr(cls, name)
- if not callable(fn):
- setattr(DummyTestClass, name, getattr(cls, name))
- continue
- new_name = f"{name}{fn_suffix}"
- new_fn = _make_fn_with_patches(fn, *patches)
- new_fn.__name__ = new_name
- if xfail_prop is not None and hasattr(fn, xfail_prop):
- new_fn = unittest.expectedFailure(new_fn)
- setattr(DummyTestClass, new_name, decorator(new_fn))
- # NB: Doesn't handle slots correctly, but whatever
- elif not hasattr(DummyTestClass, name):
- setattr(DummyTestClass, name, getattr(cls, name))
- return DummyTestClass
- # test Python 3.11+ specific features
- def skipIfNotPy311(fn: Callable[_P, _T]) -> Callable[_P, _T]:
- if sys.version_info >= (3, 11):
- return fn
- # pyrefly: ignore [bad-return, bad-argument-type]
- return unittest.skip(fn)
- def skipIfNotPy312(fn: Callable[_P, _T]) -> Callable[_P, _T]:
- if sys.version_info >= (3, 12):
- return fn
- return unittest.skip("Requires Python 3.12+")(fn)
- def skipIfOnlyNotPy312(fn: Callable[_P, _T]) -> Callable[_P, _T]:
- if sys.version_info >= (3, 13) or sys.version_info < (3, 12):
- return unittest.skip("Requires Python 3.12")(fn)
- return fn
- def xfailIfPy312(fn: Callable[_P, _T]) -> Callable[_P, _T]:
- if sys.version_info >= (3, 12):
- return unittest.expectedFailure(fn)
- return fn
- def skipIfPy312(fn: Callable[_P, _T]) -> Callable[_P, _T]:
- if sys.version_info >= (3, 12):
- return unittest.skip("Not supported in Python 3.12+")(fn)
- return fn
- # Controls tests generated in test/inductor/test_torchinductor_dynamic_shapes.py
- # and test/dynamo/test_dynamic_shapes.py
- def expectedFailureDynamic(fn: Callable[_P, _T]) -> Callable[_P, _T]:
- fn._expected_failure_dynamic = True # type: ignore[attr-defined]
- return fn
- # Controls tests generated in test/inductor/test_torchinductor_codegen_dynamic_shapes.py
- def expectedFailureCodegenDynamic(fn: Callable[_P, _T]) -> Callable[_P, _T]:
- fn._expected_failure_codegen_dynamic = True # type: ignore[attr-defined]
- return fn
- # Controls test generated in test/inductor/test_cpp_wrapper.py
- def expectedFailureDynamicWrapper(fn: Callable[_P, _T]) -> Callable[_P, _T]:
- fn._expected_failure_dynamic_wrapper = True # type: ignore[attr-defined]
- return fn
- def reset_rng_state(use_xla: bool = False) -> None:
- torch.manual_seed(1337)
- random.seed(1337)
- if np:
- np.random.seed(1337)
- if use_xla:
- import torch_xla.core.xla_model as xm
- xm.set_rng_state(1337, str(xm.xla_device()))
- def _skipped_function_for_test_reconstruct(
- f: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs
- ) -> _T:
- return f(*args, **kwargs)
- _testing_invoke_subgraph_inductor_compile_captured_gms = None
- @contextlib.contextmanager
- def _testing_capture_invoke_subgraph_inductor_compile_gms() -> Generator[
- list[torch.fx.GraphModule]
- ]:
- """
- Context manager to capture graph modules compiled by invoke_subgraph_inductor_compile.
- Usage:
- with _testing_capture_invoke_subgraph_inductor_compile_gms() as captured_gms:
- # code that triggers invoke_subgraph_inductor_compile
- pass
- # captured_gms will contain the list of captured graph modules
- """
- global _testing_invoke_subgraph_inductor_compile_captured_gms
- _testing_invoke_subgraph_inductor_compile_captured_gms = []
- try:
- yield _testing_invoke_subgraph_inductor_compile_captured_gms
- finally:
- _testing_invoke_subgraph_inductor_compile_captured_gms = None
|