| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190 |
- # mypy: allow-untyped-defs
- import gzip
- import json
- import os
- import shutil
- import tempfile
- from abc import ABC, abstractmethod
- from collections.abc import Callable, Iterable
- from enum import Enum
- from functools import partial
- from typing import Any, Optional
- from typing_extensions import deprecated, Self
- from warnings import warn
- import torch
- import torch.autograd.profiler as prof
- from torch._C import _get_privateuse1_backend_name
- from torch._C._profiler import (
- _add_execution_trace_observer,
- _disable_execution_trace_observer,
- _enable_execution_trace_observer,
- _ExperimentalConfig,
- _remove_execution_trace_observer,
- )
- from torch._environment import is_fbcode
- from torch._utils_internal import profiler_allow_cudagraph_cupti_lazy_reinit_cuda12
- from torch.autograd import kineto_available, ProfilerActivity
- from torch.profiler._memory_profiler import MemoryProfile, MemoryProfileTimeline
- __all__ = [
- "supported_activities",
- "ProfilerAction",
- "schedule",
- "tensorboard_trace_handler",
- "profile",
- "ExecutionTraceObserver",
- ]
- PROFILER_STEP_NAME = "ProfilerStep"
- _WARNINGS_SHOWN = set()
- def _warn_once(msg, category=UserWarning, stacklevel=2):
- if msg not in _WARNINGS_SHOWN:
- _WARNINGS_SHOWN.add(msg)
- warn(msg, category=category, stacklevel=stacklevel)
- class _NumpyEncoder(json.JSONEncoder):
- """
- Json encoder for numpy types (np.int, np.float, np.array etc.)
- Returns default encoder if numpy is not available
- """
- def default(self, obj):
- """Encode NumPy types to JSON"""
- try:
- import numpy as np
- except ImportError:
- return json.JSONEncoder.default(self, obj)
- if isinstance(obj, np.integer):
- return int(obj)
- elif isinstance(obj, np.floating):
- return float(obj)
- elif isinstance(obj, np.ndarray):
- return obj.tolist()
- else:
- return json.JSONEncoder.default(self, obj)
- def supported_activities():
- """
- Returns a set of supported profiler tracing activities.
- Note: profiler uses CUPTI library to trace on-device CUDA kernels.
- In case when CUDA is enabled but CUPTI is not available, passing
- ``ProfilerActivity.CUDA`` to profiler results in using the legacy CUDA
- profiling code (same as in the legacy ``torch.autograd.profiler``).
- This, in turn, results in including CUDA time in the profiler table output,
- but not in the JSON trace.
- """
- return torch.autograd._supported_activities()
- class _ITraceObserver(ABC):
- """Abstract interface for a Trace observer.
- This satisfies 3 methods: start, stop and cleanup"""
- @abstractmethod
- def start(self):
- pass
- @abstractmethod
- def stop(self):
- pass
- @abstractmethod
- def cleanup(self):
- pass
- class _KinetoProfile:
- """Low-level profiler wrap the autograd profile
- Args:
- activities (iterable): list of activity groups (CPU, CUDA) to use in profiling, supported values:
- ``torch.profiler.ProfilerActivity.CPU``, ``torch.profiler.ProfilerActivity.CUDA``,
- ``torch.profiler.ProfilerActivity.XPU``.
- Default value: ProfilerActivity.CPU and (when available) ProfilerActivity.CUDA
- or (when available) ProfilerActivity.XPU.
- record_shapes (bool): save information about operator's input shapes.
- profile_memory (bool): track tensor memory allocation/deallocation (see ``export_memory_timeline``
- for more details).
- with_stack (bool): record source information (file and line number) for the ops.
- with_flops (bool): use formula to estimate the FLOPS of specific operators
- (matrix multiplication and 2D convolution).
- with_modules (bool): record module hierarchy (including function names)
- corresponding to the callstack of the op. e.g. If module A's forward call's
- module B's forward which contains an aten::add op,
- then aten::add's module hierarchy is A.B
- Note that this support exist, at the moment, only for TorchScript models
- and not eager mode models.
- experimental_config (_ExperimentalConfig) : A set of experimental options
- used by profiler libraries like Kineto. Note, backward compatibility is not guaranteed.
- execution_trace_observer (ExecutionTraceObserver) : A PyTorch Execution Trace Observer object.
- `PyTorch Execution Traces <https://arxiv.org/pdf/2305.14516.pdf>`__ offer a graph based
- representation of AI/ML workloads and enable replay benchmarks, simulators, and emulators.
- When this argument is included the observer start() and stop() will be called for the
- same time window as PyTorch profiler.
- acc_events (bool): Enable the accumulation of FunctionEvents across multiple profiling cycles
- post_processing_timeout_s (float): Optional timeout in seconds for post-processing profiler
- results. In this context, post-processing happens after the profiling itself has finished.
- If specified, event parsing will stop after this duration and return partial results. Useful
- for handling large traces that may take too long to process.
- .. note::
- This API is experimental and subject to change in the future.
- Enabling shape and stack tracing results in additional overhead.
- When record_shapes=True is specified, profiler will temporarily hold references to the tensors;
- that may further prevent certain optimizations that depend on the reference count and introduce
- extra tensor copies.
- """
- def __init__(
- self,
- *,
- activities: Iterable[ProfilerActivity] | None = None,
- record_shapes: bool = False,
- profile_memory: bool = False,
- with_stack: bool = False,
- with_flops: bool = False,
- with_modules: bool = False,
- experimental_config: _ExperimentalConfig | None = None,
- execution_trace_observer: _ITraceObserver | None = None,
- acc_events: bool = False,
- custom_trace_id_callback: Callable[[], str] | None = None,
- post_processing_timeout_s: float | None = None,
- ) -> None:
- self.activities = set(activities) if activities else supported_activities()
- self.record_shapes = record_shapes
- self.with_flops = with_flops
- self.profile_memory = profile_memory
- self.with_stack = with_stack
- self.with_modules = with_modules
- self.experimental_config = experimental_config
- self.execution_trace_observer = execution_trace_observer
- self.acc_events = acc_events
- self.custom_trace_id_callback = custom_trace_id_callback
- self.post_processing_timeout_s = post_processing_timeout_s
- self.profiler: prof.profile | None = None
- self.has_cudagraphs = False
- self.mem_tl: MemoryProfileTimeline | None = None
- self.use_device = None
- if ProfilerActivity.CUDA in self.activities:
- # pyrefly: ignore [bad-assignment]
- self.use_device = "cuda"
- elif ProfilerActivity.XPU in self.activities:
- # pyrefly: ignore [bad-assignment]
- self.use_device = "xpu"
- elif ProfilerActivity.MTIA in self.activities:
- # pyrefly: ignore [bad-assignment]
- self.use_device = "mtia"
- elif ProfilerActivity.HPU in self.activities:
- # pyrefly: ignore [bad-assignment]
- self.use_device = "hpu"
- elif ProfilerActivity.PrivateUse1 in self.activities:
- # pyrefly: ignore [bad-assignment]
- self.use_device = _get_privateuse1_backend_name()
- # user-defined metadata to be amended to the trace
- self.preset_metadata: dict[str, str] = {}
- def start(self) -> None:
- self.prepare_trace()
- self.start_trace()
- def stop(self) -> None:
- self.stop_trace()
- def prepare_trace(self) -> None:
- if hasattr(torch, "_inductor"):
- import torch._inductor.config as inductor_config
- self.has_cudagraphs = inductor_config.triton.cudagraphs
- if (self.profiler is None) or (not self.acc_events):
- self.profiler = prof.profile(
- use_cpu=(ProfilerActivity.CPU in self.activities),
- use_device=self.use_device,
- record_shapes=self.record_shapes,
- with_flops=self.with_flops,
- profile_memory=self.profile_memory,
- with_stack=self.with_stack,
- with_modules=self.with_modules,
- use_kineto=True,
- experimental_config=self.experimental_config,
- acc_events=self.acc_events,
- custom_trace_id_callback=self.custom_trace_id_callback,
- post_processing_timeout_s=self.post_processing_timeout_s,
- )
- if (self.profiler is not None) and (not self.acc_events):
- _warn_once(
- "Warning: Profiler clears events at the end of each cycle."
- "Only events from the current cycle will be reported."
- "To keep events across cycles, set acc_events=True."
- )
- self.profiler._prepare_trace()
- def start_trace(self) -> None:
- if self.execution_trace_observer:
- self.execution_trace_observer.start()
- if self.profiler is None:
- raise AssertionError("Profiler must be initialized before starting trace")
- self.profiler._start_trace()
- if self.profile_memory:
- self.add_metadata_json("profile_memory", "1")
- if self.with_stack:
- self.add_metadata_json("with_stack", "1")
- if self.record_shapes:
- self.add_metadata_json("record_shapes", "1")
- if self.with_modules:
- self.add_metadata_json("with_modules", "1")
- if self.with_flops:
- self.add_metadata_json("with_flops", "1")
- if kineto_available():
- dist_info = self._get_distributed_info()
- if dist_info:
- self.add_metadata_json(
- "distributedInfo", json.dumps(dist_info, cls=_NumpyEncoder)
- )
- cuda_version = None
- if hasattr(torch, "version"):
- from torch.torch_version import TorchVersion
- cuda_version = TorchVersion(getattr(torch.version, "cuda", "0.0"))
- if self.has_cudagraphs and (
- (cuda_version and cuda_version < "12.6")
- or not profiler_allow_cudagraph_cupti_lazy_reinit_cuda12()
- ):
- os.environ["DISABLE_CUPTI_LAZY_REINIT"] = "1"
- self.add_metadata_json("DISABLE_CUPTI_LAZY_REINIT", "1")
- # FIXME: CUDA Graph does not work well with CUPTI teardown.
- # 1) crashes on 1st lazy CUPTI re-init after teardown (CUDA 11)
- # 2) crashes on 2nd non-lazy CUPTI re-init after teardown (CUDA 12)
- # Workaround: turn off CUPTI teardown when using CUDA Graphs.
- os.environ["TEARDOWN_CUPTI"] = "0"
- # Insert the preset user metadata to the trace
- for k, v in self.preset_metadata.items():
- self.add_metadata_json(k, v)
- def stop_trace(self) -> None:
- if self.execution_trace_observer:
- self.execution_trace_observer.stop()
- if self.profiler is None:
- raise AssertionError("Profiler must be initialized before stopping trace")
- self.profiler.__exit__(None, None, None)
- def export_chrome_trace(self, path: str):
- """
- Exports the collected trace in Chrome JSON format. If kineto is enabled, only
- last cycle in schedule is exported.
- """
- if self.profiler is None:
- raise AssertionError(
- "Profiler must be initialized before exporting chrome trace"
- )
- if path.endswith(".gz"):
- with tempfile.NamedTemporaryFile("w+b", suffix=".json") as fp:
- retvalue = self.profiler.export_chrome_trace(fp.name)
- with open(fp.name, "rb") as fin, gzip.open(path, "wb") as fout:
- fout.writelines(fin)
- return retvalue
- else:
- return self.profiler.export_chrome_trace(path)
- def export_stacks(self, path: str, metric: str = "self_cpu_time_total"):
- """Save stack traces to a file
- Args:
- path (str): save stacks file to this location;
- metric (str): metric to use: "self_cpu_time_total" or "self_cuda_time_total"
- """
- if self.profiler is None:
- raise AssertionError("Profiler must be initialized before exporting stacks")
- return self.profiler.export_stacks(path, metric)
- def toggle_collection_dynamic(
- self, enable: bool, activities: Iterable[ProfilerActivity]
- ) -> None:
- """Toggle collection of activities on/off at any point of collection. Currently supports toggling Torch Ops
- (CPU) and CUDA activity supported in Kineto
- Args:
- activities (iterable): list of activity groups to use in profiling, supported values:
- ``torch.profiler.ProfilerActivity.CPU``, ``torch.profiler.ProfilerActivity.CUDA``
- Examples:
- .. code-block:: python
- with torch.profiler.profile(
- activities=[
- torch.profiler.ProfilerActivity.CPU,
- torch.profiler.ProfilerActivity.CUDA,
- ]
- ) as p:
- code_to_profile_0()
- // turn off collection of all CUDA activity
- p.toggle_collection_dynamic(False, [torch.profiler.ProfilerActivity.CUDA])
- code_to_profile_1()
- // turn on collection of all CUDA activity
- p.toggle_collection_dynamic(True, [torch.profiler.ProfilerActivity.CUDA])
- code_to_profile_2()
- print(p.key_averages().table(
- sort_by="self_cuda_time_total", row_limit=-1))
- """
- if self.profiler is None:
- return
- self.profiler.toggle_collection_dynamic(enable, activities)
- def key_averages(
- self,
- group_by_input_shape: bool = False,
- group_by_stack_n: int = 0,
- group_by_overload_name: bool = False,
- ):
- """Averages events, grouping them by operator name and (optionally) input shapes, stack
- and overload name.
- .. note::
- To use shape/stack functionality make sure to set record_shapes/with_stack
- when creating profiler context manager.
- """
- if self.profiler is None:
- raise AssertionError(
- "Profiler must be initialized before getting key averages"
- )
- return self.profiler.key_averages(
- group_by_input_shape, group_by_stack_n, group_by_overload_name
- )
- def events(self):
- """
- Returns the list of unaggregated profiler events,
- to be used in the trace callback or after the profiling is finished
- """
- if self.profiler is None:
- raise AssertionError("Profiler must be initialized before accessing events")
- return self.profiler.function_events
- def add_metadata(self, key: str, value: str) -> None:
- """
- Adds a user defined metadata with a string key and a string value
- into the trace file
- """
- wrapped_value = '"' + value.replace('"', '\\"') + '"'
- torch.autograd._add_metadata_json(key, wrapped_value)
- def add_metadata_json(self, key: str, value: str) -> None:
- """
- Adds a user defined metadata with a string key and a valid json value
- into the trace file
- """
- torch.autograd._add_metadata_json(key, value)
- def preset_metadata_json(self, key: str, value: str) -> None:
- """
- Preset a user defined metadata when the profiler is not started
- and added into the trace file later.
- Metadata is in the format of a string key and a valid json value
- """
- self.preset_metadata[key] = value
- def _get_distributed_info(self):
- import torch.distributed as dist
- if not dist.is_available() or not dist.is_initialized():
- return None
- backend = dist.get_backend()
- dist_info = {
- "backend": backend,
- "rank": dist.get_rank(),
- "world_size": dist.get_world_size(),
- "pg_count": dist.get_pg_count(),
- "pg_config": dist.distributed_c10d._get_all_pg_configs(),
- }
- if backend == "nccl":
- nccl_version = torch.cuda.nccl.version()
- # pyrefly: ignore [bad-typed-dict-key, unsupported-operation]
- dist_info["nccl_version"] = ".".join(str(v) for v in nccl_version)
- return dist_info
- def _memory_profile(self) -> MemoryProfile:
- required = ("record_shapes", "profile_memory", "with_stack")
- missing = [f"{i}=True" for i in required if not getattr(self, i)]
- if missing:
- raise ValueError(f"{', '.join(missing)} required for memory profiling.")
- if self.profiler is None or self.profiler.kineto_results is None:
- raise AssertionError(
- "Profiler and kineto_results must be initialized for memory profiling"
- )
- return MemoryProfile(self.profiler.kineto_results)
- @deprecated(
- "`export_memory_timeline` is deprecated and will be removed in a future version. "
- "Please use `torch.cuda.memory._record_memory_history` and `torch.cuda.memory._export_memory_snapshot` instead.",
- category=FutureWarning,
- )
- def export_memory_timeline(self, path: str, device: str | None = None) -> None:
- """Export memory event information from the profiler collected
- tree for a given device, and export a timeline plot. There are 3
- exportable files using ``export_memory_timeline``, each controlled by the
- ``path``'s suffix.
- - For an HTML compatible plot, use the suffix ``.html``, and a memory timeline
- plot will be embedded as a PNG file in the HTML file.
- - For plot points consisting of ``[times, [sizes by category]]``, where
- ``times`` are timestamps and ``sizes`` are memory usage for each category.
- The memory timeline plot will be saved a JSON (``.json``) or gzipped JSON
- (``.json.gz``) depending on the suffix.
- - For raw memory points, use the suffix ``.raw.json.gz``. Each raw memory
- event will consist of ``(timestamp, action, numbytes, category)``, where
- ``action`` is one of ``[PREEXISTING, CREATE, INCREMENT_VERSION, DESTROY]``,
- and ``category`` is one of the enums from
- ``torch.profiler._memory_profiler.Category``.
- Output: Memory timeline written as gzipped JSON, JSON, or HTML.
- .. deprecated::
- ``export_memory_timeline`` is deprecated and will be removed in a future version.
- Please use ``torch.cuda.memory._record_memory_history`` and
- ``torch.cuda.memory._export_memory_snapshot`` instead.
- """
- # Default to device 0, if unset. Fallback on cpu.
- if device is None:
- if self.use_device and self.use_device != "cuda":
- device = self.use_device + ":0"
- else:
- device = "cuda:0" if torch.cuda.is_available() else "cpu"
- # Construct the memory timeline plot data
- self.mem_tl = MemoryProfileTimeline(self._memory_profile())
- # Depending on the file suffix, save the data as json.gz or json.
- # For html, we can embed the image into an HTML file.
- if path.endswith(".html"):
- self.mem_tl.export_memory_timeline_html(path, device)
- elif path.endswith(".gz"):
- with tempfile.NamedTemporaryFile("w+t", suffix=".json") as fp:
- if path.endswith("raw.json.gz"):
- self.mem_tl.export_memory_timeline_raw(fp.name, device)
- else:
- self.mem_tl.export_memory_timeline(fp.name, device)
- with open(fp.name) as fin, gzip.open(path, "wt") as fout:
- fout.writelines(fin)
- else:
- self.mem_tl.export_memory_timeline(path, device)
- class ProfilerAction(Enum):
- """
- Profiler actions that can be taken at the specified intervals
- """
- NONE = 0
- WARMUP = 1
- RECORD = 2
- RECORD_AND_SAVE = 3
- def schedule(
- *,
- wait: int,
- warmup: int,
- active: int,
- repeat: int = 0,
- skip_first: int = 0,
- skip_first_wait: int = 0,
- ) -> Callable:
- """
- Returns a callable that can be used as profiler ``schedule`` argument. The profiler will skip
- the first ``skip_first`` steps, then wait for ``wait`` steps, then do the warmup for the next ``warmup`` steps,
- then do the active recording for the next ``active`` steps and then repeat the cycle starting with ``wait`` steps.
- The optional number of cycles is specified with the ``repeat`` parameter, the zero value means that
- the cycles will continue until the profiling is finished.
- The ``skip_first_wait`` parameter controls whether the first ``wait`` stage should be skipped.
- This can be useful if a user wants to wait longer than ``skip_first`` between cycles, but not
- for the first profile. For example, if ``skip_first`` is 10 and ``wait`` is 20, the first cycle will
- wait 10 + 20 = 30 steps before warmup if ``skip_first_wait`` is zero, but will wait only 10
- steps if ``skip_first_wait`` is non-zero. All subsequent cycles will then wait 20 steps between the
- last active and warmup.
- """
- def schedule_fn(step: int) -> ProfilerAction:
- if step < 0:
- raise AssertionError(f"Step must be non-negative. Got {step}.")
- if step < skip_first:
- return ProfilerAction.NONE
- else:
- step -= skip_first
- # If wait >> skip_first and we want to grab profiling early, shift left by wait if skip_first_wait is True
- if skip_first_wait != 0:
- step += wait
- num_steps = wait + warmup + active
- if repeat > 0 and step / num_steps >= repeat:
- return ProfilerAction.NONE
- mod_step = step % num_steps
- if mod_step < wait:
- return ProfilerAction.NONE
- elif mod_step < wait + warmup:
- return ProfilerAction.WARMUP
- else:
- return (
- ProfilerAction.RECORD
- if mod_step < num_steps - 1
- else ProfilerAction.RECORD_AND_SAVE
- )
- if wait < 0 or warmup < 0 or active <= 0 or repeat < 0 or skip_first < 0:
- raise AssertionError(
- f"Invalid profiler schedule arguments. Got wait={wait} (need >= 0), warmup={warmup} (need >= 0), "
- f"active={active} (need > 0), repeat={repeat} (need >= 0), skip_first={skip_first} (need >= 0)."
- )
- if warmup == 0:
- warn(
- "Profiler won't be using warmup, this can skew profiler results",
- stacklevel=2,
- )
- return schedule_fn
- def _default_schedule_fn(_: int) -> ProfilerAction:
- """
- Default profiler behavior - immediately starts recording the events,
- keeps doing it on every profiler step.
- """
- return ProfilerAction.RECORD
- def tensorboard_trace_handler(
- dir_name: str, worker_name: str | None = None, use_gzip: bool = False
- ):
- """
- Outputs tracing files to directory of ``dir_name``, then that directory can be
- directly delivered to tensorboard as logdir.
- ``worker_name`` should be unique for each worker in distributed scenario,
- it will be set to '[hostname]_[pid]' by default.
- """
- import socket
- import time
- def handler_fn(prof) -> None:
- nonlocal worker_name
- if not os.path.isdir(dir_name):
- try:
- os.makedirs(dir_name, exist_ok=True)
- except Exception as e:
- raise RuntimeError("Can't create directory: " + dir_name) from e
- if not worker_name:
- worker_name = f"{socket.gethostname()}_{os.getpid()}"
- # Use nanosecond here to avoid naming clash when exporting the trace
- file_name = f"{worker_name}.{time.time_ns()}.pt.trace.json"
- if use_gzip:
- file_name = file_name + ".gz"
- prof.export_chrome_trace(os.path.join(dir_name, file_name))
- return handler_fn
- class profile(_KinetoProfile):
- """Profiler context manager.
- Args:
- activities (iterable): list of activity groups (CPU, CUDA) to use in profiling, supported values:
- ``torch.profiler.ProfilerActivity.CPU``, ``torch.profiler.ProfilerActivity.CUDA``,
- ``torch.profiler.ProfilerActivity.XPU``.
- Default value: ProfilerActivity.CPU and (when available) ProfilerActivity.CUDA
- or (when available) ProfilerActivity.XPU.
- schedule (Callable): callable that takes step (int) as a single parameter and returns
- ``ProfilerAction`` value that specifies the profiler action to perform at each step.
- on_trace_ready (Callable): callable that is called at each step when ``schedule``
- returns ``ProfilerAction.RECORD_AND_SAVE`` during the profiling.
- record_shapes (bool): save information about operator's input shapes.
- profile_memory (bool): track tensor memory allocation/deallocation.
- with_stack (bool): record source information (file and line number) for the ops.
- with_flops (bool): use formula to estimate the FLOPs (floating point operations) of specific operators
- (matrix multiplication and 2D convolution).
- with_modules (bool): record module hierarchy (including function names)
- corresponding to the callstack of the op. e.g. If module A's forward call's
- module B's forward which contains an aten::add op,
- then aten::add's module hierarchy is A.B
- Note that this support exist, at the moment, only for TorchScript models
- and not eager mode models.
- experimental_config (_ExperimentalConfig) : A set of experimental options
- used for Kineto library features. Note, backward compatibility is not guaranteed.
- execution_trace_observer (ExecutionTraceObserver) : A PyTorch Execution Trace Observer object.
- `PyTorch Execution Traces <https://arxiv.org/pdf/2305.14516.pdf>`__ offer a graph based
- representation of AI/ML workloads and enable replay benchmarks, simulators, and emulators.
- When this argument is included the observer start() and stop() will be called for the
- same time window as PyTorch profiler. See the examples section below for a code sample.
- acc_events (bool): Enable the accumulation of FunctionEvents across multiple profiling cycles
- post_processing_timeout_s (float): Optional timeout in seconds for post-processing profiler
- results. If specified, event parsing will stop after this duration and return partial
- results. Useful for handling large traces that may take too long to process.
- use_cuda (bool):
- .. deprecated:: 1.8.1
- use ``activities`` instead.
- .. note::
- Use :func:`~torch.profiler.schedule` to generate the callable schedule.
- Non-default schedules are useful when profiling long training jobs
- and allow the user to obtain multiple traces at the different iterations
- of the training process.
- The default schedule simply records all the events continuously for the
- duration of the context manager.
- .. note::
- Use :func:`~torch.profiler.tensorboard_trace_handler` to generate result files for TensorBoard:
- ``on_trace_ready=torch.profiler.tensorboard_trace_handler(dir_name)``
- After profiling, result files can be found in the specified directory. Use the command:
- ``tensorboard --logdir dir_name``
- to see the results in TensorBoard.
- For more information, see
- `PyTorch Profiler TensorBoard Plugin <https://github.com/pytorch/kineto/tree/master/tb_plugin>`__
- .. note::
- Enabling shape and stack tracing results in additional overhead.
- When record_shapes=True is specified, profiler will temporarily hold references to the tensors;
- that may further prevent certain optimizations that depend on the reference count and introduce
- extra tensor copies.
- Examples:
- .. code-block:: python
- with torch.profiler.profile(
- activities=[
- torch.profiler.ProfilerActivity.CPU,
- torch.profiler.ProfilerActivity.CUDA,
- ]
- ) as p:
- code_to_profile()
- print(p.key_averages().table(sort_by="self_cuda_time_total", row_limit=-1))
- Using the profiler's ``schedule``, ``on_trace_ready`` and ``step`` functions:
- .. code-block:: python
- # Non-default profiler schedule allows user to turn profiler on and off
- # on different iterations of the training loop;
- # trace_handler is called every time a new trace becomes available
- def trace_handler(prof):
- print(
- prof.key_averages().table(sort_by="self_cuda_time_total", row_limit=-1)
- )
- # prof.export_chrome_trace("/tmp/test_trace_" + str(prof.step_num) + ".json")
- with torch.profiler.profile(
- activities=[
- torch.profiler.ProfilerActivity.CPU,
- torch.profiler.ProfilerActivity.CUDA,
- ],
- # In this example with wait=1, warmup=1, active=2, repeat=1,
- # profiler will skip the first step/iteration,
- # start warming up on the second, record
- # the third and the forth iterations,
- # after which the trace will become available
- # and on_trace_ready (when set) is called;
- # the cycle repeats starting with the next step
- schedule=torch.profiler.schedule(wait=1, warmup=1, active=2, repeat=1),
- on_trace_ready=trace_handler,
- # on_trace_ready=torch.profiler.tensorboard_trace_handler('./log')
- # used when outputting for tensorboard
- ) as p:
- for iter in range(N):
- code_iteration_to_profile(iter)
- # send a signal to the profiler that the next iteration has started
- p.step()
- The following sample shows how to setup up an Execution Trace Observer (`execution_trace_observer`)
- .. code-block:: python
- with torch.profiler.profile(
- ...
- execution_trace_observer=(
- ExecutionTraceObserver().register_callback("./execution_trace.json")
- ),
- ) as p:
- for iter in range(N):
- code_iteration_to_profile(iter)
- p.step()
- You can also refer to test_execution_trace_with_kineto() in tests/profiler/test_profiler.py.
- Note: One can also pass any object satisfying the _ITraceObserver interface.
- """
- def __init__(
- self,
- *,
- activities: Iterable[ProfilerActivity] | None = None,
- schedule: Callable[[int], ProfilerAction] | None = None,
- on_trace_ready: Callable[..., Any] | None = None,
- record_shapes: bool = False,
- profile_memory: bool = False,
- with_stack: bool = False,
- with_flops: bool = False,
- with_modules: bool = False,
- experimental_config: _ExperimentalConfig | None = None,
- execution_trace_observer: _ITraceObserver | None = None,
- acc_events: bool = False,
- # deprecated:
- use_cuda: bool | None = None,
- custom_trace_id_callback: Callable[[], str] | None = None,
- post_processing_timeout_s: float | None = None,
- ) -> None:
- activities_set = set(activities) if activities else supported_activities()
- if use_cuda is not None:
- warn(
- "`use_cuda` is deprecated, use `activities` argument instead",
- FutureWarning,
- stacklevel=2,
- )
- if use_cuda:
- activities_set.add(ProfilerActivity.CUDA)
- elif ProfilerActivity.CUDA in activities_set:
- activities_set.remove(ProfilerActivity.CUDA)
- if len(activities_set) == 0:
- raise AssertionError("No valid profiler activities found")
- super().__init__(
- activities=activities,
- record_shapes=record_shapes,
- profile_memory=profile_memory,
- with_stack=with_stack,
- with_flops=with_flops,
- with_modules=with_modules,
- experimental_config=experimental_config,
- execution_trace_observer=execution_trace_observer
- if execution_trace_observer
- else ExecutionTraceObserver.build_execution_trace_obs_from_env(),
- acc_events=acc_events,
- custom_trace_id_callback=custom_trace_id_callback,
- post_processing_timeout_s=post_processing_timeout_s,
- )
- if schedule:
- self.schedule = schedule
- # add step markers into the trace and table view
- self.record_steps = True
- else:
- self.schedule = _default_schedule_fn
- self.record_steps = False
- self.on_trace_ready = on_trace_ready
- self.step_num = 0
- self.current_action = self.schedule(self.step_num)
- self.step_rec_fn: prof.record_function | None = None
- self.action_map: dict[
- tuple[ProfilerAction, ProfilerAction | None], list[Any]
- ] = {
- # key is (prev_action, current_action), value is action list corresponding to the state pair.
- (ProfilerAction.NONE, ProfilerAction.NONE): [],
- (ProfilerAction.NONE, ProfilerAction.WARMUP): [self.prepare_trace],
- (ProfilerAction.NONE, ProfilerAction.RECORD): [
- self.prepare_trace,
- self.start_trace,
- ],
- (ProfilerAction.NONE, ProfilerAction.RECORD_AND_SAVE): [
- self.prepare_trace,
- self.start_trace,
- ],
- (ProfilerAction.WARMUP, ProfilerAction.NONE): [
- partial(warn, "Incorrect schedule: WARMUP followed by NONE"),
- self.start_trace,
- self.stop_trace,
- ],
- (ProfilerAction.WARMUP, ProfilerAction.WARMUP): [],
- (ProfilerAction.WARMUP, ProfilerAction.RECORD): [self.start_trace],
- (ProfilerAction.WARMUP, ProfilerAction.RECORD_AND_SAVE): [self.start_trace],
- (ProfilerAction.RECORD, ProfilerAction.NONE): [
- partial(warn, "Incorrect schedule: RECORD followed by NONE"),
- self.stop_trace,
- ],
- (ProfilerAction.RECORD, ProfilerAction.WARMUP): [
- partial(warn, "Incorrect schedule: RECORD followed by WARMUP"),
- self.stop_trace,
- ],
- (ProfilerAction.RECORD, ProfilerAction.RECORD): [],
- (ProfilerAction.RECORD, ProfilerAction.RECORD_AND_SAVE): [],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.NONE): [
- self.stop_trace,
- self._trace_ready,
- ],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.WARMUP): [
- self.stop_trace,
- self._trace_ready,
- self.prepare_trace,
- ],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.RECORD): [
- self.stop_trace,
- self._trace_ready,
- self.prepare_trace,
- self.start_trace,
- ],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.RECORD_AND_SAVE): [
- self.stop_trace,
- self._trace_ready,
- self.prepare_trace,
- self.start_trace,
- ],
- # used for exit action
- (ProfilerAction.WARMUP, None): [self.start_trace, self.stop_trace],
- (ProfilerAction.RECORD, None): [self.stop_trace, self._trace_ready],
- (ProfilerAction.RECORD_AND_SAVE, None): [
- self.stop_trace,
- self._trace_ready,
- ],
- }
- # Start tracking increments to profiler step, this will be used
- # by Kineto
- prof.KinetoStepTracker.init_step_count(PROFILER_STEP_NAME)
- def __enter__(self):
- self.start()
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.stop()
- prof.KinetoStepTracker.erase_step_count(PROFILER_STEP_NAME)
- if self.execution_trace_observer:
- self.execution_trace_observer.cleanup()
- def start(self) -> None:
- self._transit_action(ProfilerAction.NONE, self.current_action)
- if self.record_steps:
- self.step_rec_fn = prof.record_function(
- "ProfilerStep#" + str(self.step_num)
- )
- self.step_rec_fn.__enter__()
- def stop(self) -> None:
- if self.record_steps and self.step_rec_fn:
- self.step_rec_fn.__exit__(None, None, None)
- self._transit_action(self.current_action, None)
- def step(self) -> None:
- """
- Signals the profiler that the next profiling step has started.
- """
- if self.record_steps and self.step_rec_fn:
- self.step_rec_fn.__exit__(None, None, None)
- prev_action = self.current_action
- self.step_num += 1
- self.current_action = self.schedule(self.step_num)
- self._transit_action(prev_action, self.current_action)
- if os.environ.get("KINETO_USE_DAEMON", "") or (
- is_fbcode() and os.environ.get("KINETO_FORCE_STEP_HOOK", "")
- ):
- prof.KinetoStepTracker.increment_step(PROFILER_STEP_NAME)
- if self.record_steps:
- self.step_rec_fn = prof.record_function(
- "ProfilerStep#" + str(self.step_num)
- )
- self.step_rec_fn.__enter__()
- def set_custom_trace_id_callback(self, callback) -> None:
- """
- Sets a callback to be called when a new trace ID is generated.
- """
- self.custom_trace_id_callback = callback
- def get_trace_id(self):
- """
- Returns the current trace ID.
- """
- if self.profiler is None:
- return None
- return self.profiler.trace_id
- def _trace_ready(self) -> None:
- if self.on_trace_ready:
- self.on_trace_ready(self)
- def _transit_action(self, prev_action, current_action) -> None:
- action_list = self.action_map.get((prev_action, current_action))
- if action_list:
- for action in action_list:
- action()
- def _stats(self) -> prof._ProfilerStats | None:
- if self.profiler is None:
- return None
- return self.profiler._stats
- class ExecutionTraceObserver(_ITraceObserver):
- """Execution Trace Observer
- Each process can have a single ExecutionTraceObserver instance. The observer
- can be added to record function callbacks via calling register_callback()
- explicitly. Without calling unregister_callback(), repeated calls to
- register_callback() will not add additional observers to record function
- callbacks. Once an ExecutionTraceObserver is created, the start() and stop()
- methods control when the event data is recorded.
- Deleting or calling unregister_callback() will remove the observer from the
- record function callbacks, finalize the output file, and will stop
- incurring any overheads.
- """
- def __init__(self) -> None:
- """
- Initializes the default states.
- """
- self._registered = False
- self._execution_trace_running = False
- self.extra_resources_collection = False
- self.resources_dir: str = ""
- self.output_file_path: str = ""
- self.output_file_path_observer: str = ""
- def __del__(self) -> None:
- """
- Calls unregister_callback() to make sure to finalize outputs.
- """
- self.unregister_callback()
- @staticmethod
- def build_execution_trace_obs_from_env() -> Optional["ExecutionTraceObserver"]:
- """
- Returns an ExecutionTraceObserver instance if the environment variable
- ENABLE_PYTORCH_EXECUTION_TRACE is set to 1, otherwise returns None.
- Configures the observer to also collect extra resources if the environment variable
- ``ENABLE_PYTORCH_EXECUTION_TRACE_EXTRAS=1``. These are resources such as generated kernels,
- index tensor data etc. that are required to make the Execution Trace replayable.
- """
- if os.environ.get("ENABLE_PYTORCH_EXECUTION_TRACE", "0") == "1":
- try:
- with tempfile.NamedTemporaryFile(
- "w+t", suffix=".et.json", delete=False
- ) as fp:
- filename = fp.name
- except Exception as e:
- warn(
- f"Execution trace will not be recorded. Exception on creating default temporary file: {e}",
- stacklevel=2,
- )
- return None
- et = ExecutionTraceObserver()
- et.register_callback(filename)
- # additionally, check if the env requires us to collect extra resources
- if os.environ.get("ENABLE_PYTORCH_EXECUTION_TRACE_EXTRAS", "0") == "1":
- et.set_extra_resource_collection(True)
- else:
- et.set_extra_resource_collection(False)
- return et
- return None
- def set_extra_resource_collection(self, val) -> None:
- """
- Collects extra resources such as generated kernels, index tensor data, and any other
- metadata that is required to complete the Execution Trace content.
- The caller should call this method with val=True after calling register_callback() if they want
- to collect the extra resources.
- """
- self.extra_resources_collection = val
- if self.extra_resources_collection:
- self.get_resources_dir(can_create=True)
- return
- def register_callback(self, output_file_path: str) -> Self:
- """
- Adds ET observer to record function callbacks. The data will be
- written to output_file_path.
- """
- def get_temp_uncompressed_file() -> str:
- with tempfile.NamedTemporaryFile("w+b", suffix=".json", delete=False) as fp:
- return fp.name
- if not self._registered:
- self.output_file_path = output_file_path
- if output_file_path.endswith(".gz"):
- output_file_path = get_temp_uncompressed_file()
- self.output_file_path_observer = output_file_path
- self._registered = _add_execution_trace_observer(output_file_path)
- return self
- def get_resources_dir(self, can_create=False) -> str | None:
- """
- Generates the resources directory for the generated kernels,
- or index tensor data or any other metadata that is required
- to complete the Execution Trace content.
- The directory is created right where the ET file is being output.
- Only works if the observer has called set_extra_resource_collection(val=True).
- Returns None if the observer is not configured with extra resource collection.
- """
- if not self.extra_resources_collection:
- return None
- if self.resources_dir:
- # already created
- return self.resources_dir
- generated_path = ExecutionTraceObserver.get_resources_dir_for_et_path(
- self.output_file_path, create_dir=can_create
- )
- if not generated_path:
- # could not find of create the resources dir
- return None
- self.resources_dir = generated_path
- return self.resources_dir
- @staticmethod
- def get_resources_dir_for_et_path(
- trace_path, create_dir: bool = False
- ) -> str | None:
- work_dir, file_name = os.path.split(trace_path)
- resource_dir = os.path.join(
- work_dir, os.path.splitext(file_name)[0] + "_resources"
- )
- if not os.path.exists(resource_dir):
- if create_dir:
- try:
- os.mkdir(resource_dir)
- except Exception:
- warn(
- f"Execution trace exception when creating {resource_dir}",
- stacklevel=2,
- )
- return None
- else:
- return None
- return resource_dir
- def unregister_callback(self) -> None:
- """
- Removes ET observer from record function callbacks.
- """
- def _save_triton_kernels() -> None:
- try:
- resource_dir = self.get_resources_dir()
- except Exception as e:
- warn(
- f"Execution trace exception when generating resource directory: {e}",
- stacklevel=2,
- )
- return
- if not resource_dir:
- return
- # Save the kernel paths for the generated kernels
- from torch._inductor.codecache import PyCodeCache
- kernel_files = [
- v.__file__
- for v in PyCodeCache.modules
- if getattr(v, "__file__", None) is not None
- ]
- for kernel_file in kernel_files:
- if kernel_file is None:
- continue
- name = os.path.basename(kernel_file)
- dst = os.path.join(resource_dir, name)
- shutil.copyfile(kernel_file, dst)
- def _save_gz_file(uncompressed_file: str, output_file: str) -> None:
- print(f"Execution Trace: compressing {uncompressed_file} to {output_file}")
- with open(uncompressed_file, "rb") as fin:
- with gzip.open(output_file, "wb") as fout:
- fout.writelines(fin)
- os.remove(uncompressed_file)
- if self._registered:
- self.stop()
- try:
- _save_triton_kernels()
- except Exception as e:
- warn(f"Execution trace failed to save kernels: {e}", stacklevel=2)
- _remove_execution_trace_observer()
- if self.output_file_path.endswith("gz"):
- _save_gz_file(self.output_file_path_observer, self.output_file_path)
- self._registered = False
- @property
- def is_registered(self):
- """
- Returns True if the execution trace observer is registered, otherwise False.
- """
- return self._registered
- def is_running(self):
- """
- Returns True if the observer is running, otherwise False.
- """
- return self._execution_trace_running
- def start(self) -> None:
- """
- Starts to capture.
- """
- if self._registered and not self._execution_trace_running:
- _enable_execution_trace_observer()
- self._execution_trace_running = True
- self._record_pg_config()
- def stop(self) -> None:
- """
- Stops to capture.
- """
- if self._execution_trace_running:
- _disable_execution_trace_observer()
- self._execution_trace_running = False
- def cleanup(self) -> None:
- """
- Calls unregister_callback() to make sure to finalize outputs.
- """
- self.unregister_callback()
- def get_output_file_path(self) -> str | None:
- """
- Returns the output file name or None.
- """
- if self.output_file_path:
- return self.output_file_path
- else:
- return None
- def _record_pg_config(self) -> None:
- # Records the PG config info to the trace as node:
- # ## process_group:init ##
- if (
- self.is_registered
- and torch.distributed.is_available()
- and torch.distributed.is_initialized()
- ):
- pg_config_info = torch.distributed.distributed_c10d._world.pg_config_info
- torch.autograd._record_function_with_args_enter(
- "## process_group:init ##",
- json.dumps(pg_config_info, cls=_NumpyEncoder),
- )
|