| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336 |
- # Copyright 2020-present the HuggingFace Inc. team.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- Torch utilities for the Trainer class.
- """
- import contextlib
- import copy
- import datetime
- import io
- import json
- import math
- import os
- import re
- import sys
- import warnings
- from collections.abc import Iterator, Mapping
- from contextlib import contextmanager
- from dataclasses import dataclass, field
- from itertools import chain
- from logging import StreamHandler
- from typing import Any
- import numpy as np
- import torch
- import torch.distributed as dist
- from packaging import version
- from torch import nn
- from torch.utils.data import Dataset, IterableDataset, RandomSampler, Sampler
- from torch.utils.data.distributed import DistributedSampler
- from .integrations.deepspeed import is_deepspeed_zero3_enabled
- from .tokenization_utils_base import BatchEncoding
- from .utils import (
- is_sagemaker_mp_enabled,
- is_torch_available,
- is_torch_xla_available,
- is_training_run_on_sagemaker,
- logging,
- )
- if is_training_run_on_sagemaker():
- logging.add_handler(StreamHandler(sys.stdout))
- if is_torch_xla_available():
- import torch_xla.runtime as xr
- if is_torch_available():
- from torch.optim.lr_scheduler import LRScheduler
- logger = logging.get_logger(__name__)
- def get_dataloader_sampler(dataloader):
- if hasattr(dataloader, "batch_sampler") and dataloader.batch_sampler is not None:
- return get_dataloader_sampler(dataloader.batch_sampler)
- elif hasattr(dataloader, "sampler"):
- return dataloader.sampler
- def atleast_1d(tensor_or_array: torch.Tensor | np.ndarray):
- if isinstance(tensor_or_array, torch.Tensor):
- tensor_or_array = torch.atleast_1d(tensor_or_array)
- else:
- tensor_or_array = np.atleast_1d(tensor_or_array)
- return tensor_or_array
- def torch_pad_and_concatenate(tensor1, tensor2, padding_index=-100):
- """Concatenates `tensor1` and `tensor2` on first axis, applying padding on the second if necessary."""
- tensor1 = atleast_1d(tensor1)
- tensor2 = atleast_1d(tensor2)
- if len(tensor1.shape) == 1 or tensor1.shape[1] == tensor2.shape[1]:
- return torch.cat((tensor1, tensor2), dim=0)
- # Let's figure out the new shape
- new_shape = (tensor1.shape[0] + tensor2.shape[0], max(tensor1.shape[1], tensor2.shape[1])) + tensor1.shape[2:]
- # Now let's fill the result tensor
- result = tensor1.new_full(new_shape, padding_index)
- result[: tensor1.shape[0], : tensor1.shape[1]] = tensor1
- result[tensor1.shape[0] :, : tensor2.shape[1]] = tensor2
- return result
- def numpy_pad_and_concatenate(array1, array2, padding_index=-100):
- """Concatenates `array1` and `array2` on first axis, applying padding on the second if necessary."""
- array1 = atleast_1d(array1)
- array2 = atleast_1d(array2)
- if len(array1.shape) == 1 or array1.shape[1] == array2.shape[1]:
- return np.concatenate((array1, array2), axis=0)
- # Let's figure out the new shape
- new_shape = (array1.shape[0] + array2.shape[0], max(array1.shape[1], array2.shape[1])) + array1.shape[2:]
- # Now let's fill the result tensor
- result = np.full_like(array1, padding_index, shape=new_shape)
- result[: array1.shape[0], : array1.shape[1]] = array1
- result[array1.shape[0] :, : array2.shape[1]] = array2
- return result
- def nested_concat(tensors, new_tensors, padding_index=-100):
- """
- Concat the `new_tensors` to `tensors` on the first dim and pad them on the second if needed. Works for tensors or
- nested list/tuples/dict of tensors.
- """
- if not (isinstance(tensors, torch.Tensor) and isinstance(new_tensors, torch.Tensor)):
- assert type(tensors) is type(new_tensors), (
- f"Expected `tensors` and `new_tensors` to have the same type but found {type(tensors)} and {type(new_tensors)}."
- )
- if isinstance(tensors, (list, tuple)):
- return type(tensors)(nested_concat(t, n, padding_index=padding_index) for t, n in zip(tensors, new_tensors))
- elif isinstance(tensors, torch.Tensor):
- return torch_pad_and_concatenate(tensors, new_tensors, padding_index=padding_index)
- elif isinstance(tensors, Mapping):
- return type(tensors)(
- {k: nested_concat(t, new_tensors[k], padding_index=padding_index) for k, t in tensors.items()}
- )
- elif isinstance(tensors, np.ndarray):
- return numpy_pad_and_concatenate(tensors, new_tensors, padding_index=padding_index)
- else:
- raise TypeError(f"Unsupported type for concatenation: got {type(tensors)}")
- def find_batch_size(tensors):
- """
- Find the first dimension of a tensor in a nested list/tuple/dict of tensors.
- """
- if isinstance(tensors, (list, tuple)):
- for t in tensors:
- result = find_batch_size(t)
- if result is not None:
- return result
- elif isinstance(tensors, Mapping):
- for value in tensors.values():
- result = find_batch_size(value)
- if result is not None:
- return result
- elif isinstance(tensors, (torch.Tensor, np.ndarray)):
- return tensors.shape[0] if len(tensors.shape) >= 1 else None
- def nested_numpify(tensors):
- "Numpify `tensors` (even if it's a nested list/tuple/dict of tensors)."
- if isinstance(tensors, (list, tuple)):
- return type(tensors)(nested_numpify(t) for t in tensors)
- if isinstance(tensors, Mapping):
- return type(tensors)({k: nested_numpify(t) for k, t in tensors.items()})
- t = tensors.cpu()
- if t.dtype == torch.bfloat16:
- # As of Numpy 1.21.4, NumPy does not support bfloat16 (see
- # https://github.com/numpy/numpy/blob/a47ecdea856986cd60eabbd53265c2ca5916ad5d/doc/source/user/basics.types.rst ).
- # Until Numpy adds bfloat16, we must convert float32.
- t = t.to(torch.float32)
- return t.numpy()
- def nested_detach(tensors):
- "Detach `tensors` (even if it's a nested list/tuple/dict of tensors)."
- if isinstance(tensors, (list, tuple)):
- return type(tensors)(nested_detach(t) for t in tensors)
- elif isinstance(tensors, Mapping):
- return type(tensors)({k: nested_detach(t) for k, t in tensors.items()})
- return tensors.detach() if isinstance(tensors, torch.Tensor) else tensors
- def nested_xla_mesh_reduce(tensors, name):
- if is_torch_xla_available():
- import torch_xla.core.xla_model as xm
- if isinstance(tensors, (list, tuple)):
- return type(tensors)(nested_xla_mesh_reduce(t, f"{name}_{i}") for i, t in enumerate(tensors))
- if isinstance(tensors, Mapping):
- return type(tensors)(
- {k: nested_xla_mesh_reduce(t, f"{name}_{i}") for i, (k, t) in enumerate(tensors.items())}
- )
- tensors = atleast_1d(tensors)
- return xm.mesh_reduce(name, tensors, torch.cat)
- else:
- raise ImportError("Torch xla must be installed to use `nested_xla_mesh_reduce`")
- def distributed_concat(tensor: Any, num_total_examples: int | None = None) -> Any:
- try:
- if isinstance(tensor, (tuple, list)):
- return type(tensor)(distributed_concat(t, num_total_examples) for t in tensor)
- if isinstance(tensor, Mapping):
- return type(tensor)({k: distributed_concat(t, num_total_examples) for k, t in tensor.items()})
- tensor = atleast_1d(tensor).contiguous()
- output_tensors = [tensor.clone() for _ in range(dist.get_world_size())]
- dist.all_gather(output_tensors, tensor)
- concat = torch.cat(output_tensors, dim=0)
- # truncate the dummy elements added by SequentialDistributedSampler
- if num_total_examples is not None:
- concat = concat[:num_total_examples]
- return concat
- except AssertionError:
- raise AssertionError("Not currently using distributed training")
- def nested_gather(tensors, parallel_mode, name=None):
- """
- Gather value of `tensors` (tensor or list/tuple of nested tensors) across processes.
- """
- from .training_args import ParallelMode
- if tensors is None:
- return
- if is_torch_xla_available():
- if name is None:
- name = "nested_gather"
- tensors = nested_xla_mesh_reduce(tensors, name)
- elif is_sagemaker_mp_enabled():
- tensors = smp_gather(tensors)
- elif parallel_mode == ParallelMode.DISTRIBUTED:
- tensors = distributed_concat(tensors)
- return tensors
- def is_attention_mask_causal(attention_mask):
- """
- Check if an attention mask is causal (compatible with causal attention).
- Context parallelism only supports causal attention patterns. This function
- checks if the provided attention mask is compatible.
- Args:
- attention_mask (`torch.Tensor`): The attention mask to check.
- Returns:
- `bool`: True if the mask is causal or compatible with causal attention.
- """
- if attention_mask is None:
- return True # No mask is considered causal (model uses default causal masking)
- # Handle different mask dimensions
- if attention_mask.dim() == 2:
- # (batch_size, seq_len) - standard padding mask, compatible with causal attention
- return True
- elif attention_mask.dim() in [3, 4]:
- # (batch_size, seq_len, seq_len) or (batch_size, num_heads, seq_len, seq_len)
- # Check if it's lower triangular (causal)
- seq_len = attention_mask.shape[-1]
- if seq_len <= 1:
- return True # Single token or empty is always causal
- # Take first batch and head (if 4D) for checking pattern
- if attention_mask.dim() == 4:
- mask = attention_mask[0, 0] # First batch, first head
- else:
- mask = attention_mask[0] # First batch
- # Check if upper triangular part is masked (should be 0 or very negative for causal)
- upper_triangular = torch.triu(mask, diagonal=1)
- # For causal masks, upper triangular should be 0 or very negative (like -inf)
- # Use a reasonable threshold to handle float precision issues
- is_causal = torch.all(upper_triangular <= 1e-6) or torch.all(upper_triangular < -1e4)
- return is_causal.item() if isinstance(is_causal, torch.Tensor) else is_causal
- # For unknown dimensions, be conservative and reject
- return False
- def distributed_broadcast_scalars(
- scalars: list[int | float],
- num_total_examples: int | None = None,
- device: torch.device | None = torch.device("cuda"),
- ) -> torch.Tensor:
- try:
- tensorized_scalar = torch.tensor(scalars, device=device)
- output_tensors = [tensorized_scalar.clone() for _ in range(dist.get_world_size())]
- dist.all_gather(output_tensors, tensorized_scalar)
- concat = torch.cat(output_tensors, dim=0)
- # truncate the dummy elements added by SequentialDistributedSampler
- if num_total_examples is not None:
- concat = concat[:num_total_examples]
- return concat
- except AssertionError:
- raise AssertionError("Not currently using distributed training")
- def reissue_pt_warnings(caught_warnings):
- # Reissue warnings
- if len(caught_warnings) > 1:
- for w in caught_warnings:
- if w.category is not UserWarning:
- warnings.warn(w.message, w.category)
- @contextmanager
- def torch_distributed_zero_first(local_rank: int):
- """
- Decorator to make all processes in distributed training wait for each local_master to do something.
- Args:
- local_rank (`int`): The rank of the local process.
- """
- if local_rank not in [-1, 0]:
- dist.barrier()
- yield
- if local_rank == 0:
- dist.barrier()
- class DistributedSamplerWithLoop(DistributedSampler):
- """
- Like a torch.utils.data.distributed.DistributedSampler` but loops at the end back to the beginning of the shuffled
- samples to make each process have a round multiple of batch_size samples.
- Args:
- dataset (`torch.utils.data.Dataset`):
- Dataset used for sampling.
- batch_size (`int`):
- The batch size used with this sampler
- kwargs (`dict[str, Any]`, *optional*):
- All other keyword arguments passed to `DistributedSampler`.
- """
- def __init__(self, dataset, batch_size, **kwargs):
- super().__init__(dataset, **kwargs)
- self.batch_size = batch_size
- def __iter__(self):
- indices = list(super().__iter__())
- remainder = 0 if len(indices) % self.batch_size == 0 else self.batch_size - len(indices) % self.batch_size
- # DistributedSampler already added samples from the beginning to make the number of samples a round multiple
- # of the world size, so we skip those.
- start_remainder = 1 if self.rank < len(self.dataset) % self.num_replicas else 0
- indices += indices[start_remainder : start_remainder + remainder]
- return iter(indices)
- class EvalLoopContainer:
- """
- Container to store intermediate results of evaluation loop.
- Args:
- do_nested_concat (`bool`, *optional*, defaults to `True`):
- If set to `True`, each iteration will recursively concatenate a new object containing tensors to
- the existing stored tensors, provided that the structure of the existing object and the new one
- are identical. If set to `False`, all newly added tensors will be stored in a list.
- padding_index (`int`, *optional*, defaults to -100):
- Value used to pad tensors of different shapes when `do_nested_concat=True`.
- """
- def __init__(self, do_nested_concat: bool = True, padding_index: int = -100):
- self.do_nested_concat = do_nested_concat
- self.padding_index = padding_index
- self.tensors = None
- self.arrays = None
- def add(self, tensors) -> None:
- """Add tensors to the stored objects. If `do_nested_concat=True`, the tensors will be concatenated recursively."""
- if self.tensors is None:
- self.tensors = tensors if self.do_nested_concat else [tensors]
- elif self.do_nested_concat:
- self.tensors = nested_concat(self.tensors, tensors, padding_index=self.padding_index)
- else:
- self.tensors.append(tensors)
- def to_cpu_and_numpy(self) -> None:
- """Move tensors in stored objects to CPU and convert them to numpy arrays."""
- # Check if we have something to add, if not just return
- if self.tensors is None:
- return
- new_arrays = nested_numpify(self.tensors)
- if self.arrays is None:
- self.arrays = new_arrays
- elif self.do_nested_concat:
- self.arrays = nested_concat(self.arrays, new_arrays, padding_index=self.padding_index)
- else:
- self.arrays.extend(new_arrays)
- # reset device tensors after adding to cpu
- self.tensors = None
- def get_arrays(self):
- """Returns the numpified and moved to CPU stored objects."""
- self.to_cpu_and_numpy()
- return self.arrays
- def get_tpu_sampler(dataset: torch.utils.data.Dataset, batch_size: int):
- if xr.world_size() <= 1:
- return RandomSampler(dataset)
- return DistributedSampler(dataset, num_replicas=xr.world_size(), rank=xr.global_ordinal())
- def nested_new_like(arrays, num_samples, padding_index=-100):
- """Create the same nested structure as `arrays` with a first dimension always at `num_samples`."""
- if isinstance(arrays, (list, tuple)):
- return type(arrays)(nested_new_like(x, num_samples) for x in arrays)
- return np.full_like(arrays, padding_index, shape=(num_samples, *arrays.shape[1:]))
- def expand_like(arrays, new_seq_length, padding_index=-100):
- """Expand the `arrays` so that the second dimension grows to `new_seq_length`. Uses `padding_index` for padding."""
- result = np.full_like(arrays, padding_index, shape=(arrays.shape[0], new_seq_length) + arrays.shape[2:])
- result[:, : arrays.shape[1]] = arrays
- return result
- def nested_truncate(tensors, limit):
- "Truncate `tensors` at `limit` (even if it's a nested list/tuple/dict of tensors)."
- if isinstance(tensors, (list, tuple)):
- return type(tensors)(nested_truncate(t, limit) for t in tensors)
- if isinstance(tensors, Mapping):
- return type(tensors)({k: nested_truncate(t, limit) for k, t in tensors.items()})
- return tensors[:limit]
- @dataclass
- class LabelSmoother:
- """
- Adds label-smoothing on a pre-computed output from a Transformers model.
- Args:
- epsilon (`float`, *optional*, defaults to 0.1):
- The label smoothing factor.
- ignore_index (`int`, *optional*, defaults to -100):
- The index in the labels to ignore when computing the loss.
- """
- epsilon: float = 0.1
- ignore_index: int = -100
- def __call__(self, model_output, labels, shift_labels=False):
- logits = model_output["logits"] if isinstance(model_output, dict) else model_output[0]
- if shift_labels:
- logits = logits[..., :-1, :].contiguous()
- labels = labels[..., 1:].contiguous()
- log_probs = -nn.functional.log_softmax(logits, dim=-1)
- if labels.dim() == log_probs.dim() - 1:
- labels = labels.unsqueeze(-1)
- padding_mask = labels.eq(self.ignore_index)
- # In case the ignore_index is -100, the gather will fail, so we replace labels by 0. The padding_mask
- # will ignore them in any case.
- labels = torch.clamp(labels, min=0)
- nll_loss = log_probs.gather(dim=-1, index=labels)
- # works for fp16 input tensor too, by internally upcasting it to fp32
- smoothed_loss = log_probs.sum(dim=-1, keepdim=True, dtype=torch.float32)
- nll_loss.masked_fill_(padding_mask, 0.0)
- smoothed_loss.masked_fill_(padding_mask, 0.0)
- # Take the mean over the label dimensions, then divide by the number of active elements (i.e. not-padded):
- num_active_elements = padding_mask.numel() - padding_mask.long().sum()
- nll_loss = nll_loss.sum() / num_active_elements
- smoothed_loss = smoothed_loss.sum() / (num_active_elements * log_probs.shape[-1])
- return (1 - self.epsilon) * nll_loss + self.epsilon * smoothed_loss
- def get_length_grouped_indices(lengths, batch_size, mega_batch_mult=None, generator=None):
- """
- Return a list of indices so that each slice of `batch_size` consecutive indices correspond to elements of similar
- lengths. To do this, the indices are:
- - randomly permuted
- - grouped in mega-batches of size `mega_batch_mult * batch_size`
- - sorted by length in each mega-batch
- The result is the concatenation of all mega-batches, with the batch of `batch_size` containing the element of
- maximum length placed first, so that an OOM happens sooner rather than later.
- """
- # Default for mega_batch_mult: 50 or the number to get 4 megabatches, whichever is smaller.
- if mega_batch_mult is None:
- mega_batch_mult = min(len(lengths) // (batch_size * 4), 50)
- # Just in case, for tiny datasets
- if mega_batch_mult == 0:
- mega_batch_mult = 1
- # We need to use torch for the random part as a distributed sampler will set the random seed for torch.
- indices = torch.randperm(len(lengths), generator=generator)
- megabatch_size = mega_batch_mult * batch_size
- megabatches = [indices[i : i + megabatch_size].tolist() for i in range(0, len(lengths), megabatch_size)]
- megabatches = [sorted(megabatch, key=lambda i: lengths[i], reverse=True) for megabatch in megabatches]
- # The rest is to get the biggest batch first.
- # Since each megabatch is sorted by descending length, the longest element is the first
- megabatch_maximums = [lengths[megabatch[0]] for megabatch in megabatches]
- max_idx = torch.argmax(torch.tensor(megabatch_maximums)).item()
- # Switch to put the longest element in first position
- megabatches[0][0], megabatches[max_idx][0] = megabatches[max_idx][0], megabatches[0][0]
- return [i for megabatch in megabatches for i in megabatch]
- class LengthGroupedSampler(Sampler):
- r"""
- Sampler that samples indices in a way that groups together features of the dataset of roughly the same length while
- keeping a bit of randomness.
- """
- def __init__(
- self,
- batch_size: int,
- dataset: Dataset | None = None,
- lengths: list[int] | None = None,
- model_input_name: str | None = None,
- generator=None,
- ):
- if dataset is None and lengths is None:
- raise ValueError("One of dataset and lengths must be provided.")
- self.batch_size = batch_size
- if lengths is None:
- model_input_name = model_input_name if model_input_name is not None else "input_ids"
- if not isinstance(dataset[0], (dict, BatchEncoding)) or model_input_name not in dataset[0]:
- raise ValueError(
- "Can only automatically infer lengths for datasets whose items are dictionaries with an "
- f"'{model_input_name}' key."
- )
- lengths = [len(feature[model_input_name]) for feature in dataset]
- elif isinstance(lengths, torch.Tensor):
- logger.info(
- "If lengths is a torch.Tensor, LengthGroupedSampler will be slow. Converting lengths to list[int]..."
- )
- lengths = lengths.tolist()
- self.lengths = lengths
- self.generator = generator
- def __len__(self):
- return len(self.lengths)
- def __iter__(self):
- indices = get_length_grouped_indices(self.lengths, self.batch_size, generator=self.generator)
- return iter(indices)
- class DistributedLengthGroupedSampler(DistributedSampler):
- r"""
- Distributed Sampler that samples indices in a way that groups together features of the dataset of roughly the same
- length while keeping a bit of randomness.
- """
- # Copied and adapted from PyTorch DistributedSampler.
- def __init__(
- self,
- batch_size: int,
- dataset: Dataset | None = None,
- num_replicas: int | None = None,
- rank: int | None = None,
- seed: int = 0,
- drop_last: bool = False,
- lengths: list[int] | None = None,
- model_input_name: str | None = None,
- ):
- if dataset is None and lengths is None:
- raise ValueError("One of dataset and lengths must be provided.")
- if num_replicas is None:
- if not dist.is_available():
- raise RuntimeError("Requires distributed package to be available")
- num_replicas = dist.get_world_size()
- if rank is None:
- if not dist.is_available():
- raise RuntimeError("Requires distributed package to be available")
- rank = dist.get_rank()
- self.batch_size = batch_size
- self.num_replicas = num_replicas
- self.rank = rank
- self.epoch = 0
- self.drop_last = drop_last
- if lengths is None:
- model_input_name = model_input_name if model_input_name is not None else "input_ids"
- if not isinstance(dataset[0], (dict, BatchEncoding)) or model_input_name not in dataset[0]:
- raise ValueError(
- "Can only automatically infer lengths for datasets whose items are dictionaries with an "
- f"'{model_input_name}' key."
- )
- lengths = [len(feature[model_input_name]) for feature in dataset]
- elif isinstance(lengths, torch.Tensor):
- logger.info(
- "If lengths is a torch.Tensor, DistributedLengthGroupedSampler will be slow. Converting lengths to"
- " list[int]..."
- )
- lengths = lengths.tolist()
- self.lengths = lengths
- # If the dataset length is evenly divisible by # of replicas, then there
- # is no need to drop any data, since the dataset will be split equally.
- if self.drop_last and len(self.lengths) % self.num_replicas != 0:
- # Split to nearest available length that is evenly divisible.
- # This is to ensure each rank receives the same amount of data when
- # using this Sampler.
- self.num_samples = math.ceil((len(self.lengths) - self.num_replicas) / self.num_replicas)
- else:
- self.num_samples = math.ceil(len(self.lengths) / self.num_replicas)
- self.total_size = self.num_samples * self.num_replicas
- self.seed = seed
- def __iter__(self) -> Iterator:
- # Deterministically shuffle based on epoch and seed
- g = torch.Generator()
- g.manual_seed(self.seed + self.epoch)
- indices = get_length_grouped_indices(self.lengths, self.batch_size, generator=g)
- if not self.drop_last:
- # add extra samples to make it evenly divisible
- indices += indices[: (self.total_size - len(indices))]
- else:
- # remove tail of data to make it evenly divisible
- indices = indices[: self.total_size]
- assert len(indices) == self.total_size
- # subsample
- indices = indices[self.rank : self.total_size : self.num_replicas]
- assert len(indices) == self.num_samples
- return iter(indices)
- class ShardSampler(Sampler):
- """
- Sampler that shards batches between several processes. Dispatches indices batch by batch: on 2 processes with batch
- size 4, the first two batches are `[0, 1, 2, 3, 4, 5, 6, 7]` and `[8, 9, 10, 11, 12, 13, 14, 15]`, which shard into
- `[0, 1, 2, 3]` and `[8, 9, 10, 11]` for GPU-0 and `[4, 5, 6, 7]` and `[12, 13, 14, 15]` for GPU-1.
- The sampler thus yields `[0, 1, 2, 3, 8, 9, 10, 11]` on GPU-0 and `[4, 5, 6, 7, 12, 13, 14, 15]` on GPU-1.
- """
- def __init__(
- self,
- dataset: Dataset,
- batch_size: int = 1,
- drop_last: bool = False,
- num_processes: int = 1,
- process_index: int = 0,
- ):
- self.dataset = dataset
- self.batch_size = batch_size
- self.drop_last = drop_last
- self.num_processes = num_processes
- self.process_index = process_index
- self.total_batch_size = total_batch_size = batch_size * num_processes
- num_batches = len(dataset) // total_batch_size if drop_last else math.ceil(len(dataset) / total_batch_size)
- self.total_num_samples = num_batches * total_batch_size
- def __iter__(self):
- indices = list(range(len(self.dataset)))
- # Add extra samples to make it evenly divisible. While loop is there in the edge case we have a tiny dataset
- # and it needs to be done several times.
- while len(indices) < self.total_num_samples:
- indices += indices[: (self.total_num_samples - len(indices))]
- result = []
- for batch_start in range(self.batch_size * self.process_index, self.total_num_samples, self.total_batch_size):
- result += indices[batch_start : batch_start + self.batch_size]
- return iter(result)
- def __len__(self):
- # Each shard only sees a fraction of total_num_samples.
- return self.total_num_samples // self.num_processes
- class IterableDatasetShard(IterableDataset):
- """
- Wraps a PyTorch `IterableDataset` to generate samples for one of the processes only. Instances of this class will
- always yield a number of samples that is a round multiple of the actual batch size (which is `batch_size x
- num_processes`). Depending on the value of the `drop_last` attribute, it will either stop the iteration at the
- first batch that would be too small or loop with indices from the beginning.
- On two processes with an iterable dataset yielding of `[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]` with a batch size of
- 2:
- - the shard on process 0 will yield `[0, 1, 4, 5, 8, 9]` so will see batches `[0, 1]`, `[4, 5]`, `[8, 9]`
- - the shard on process 1 will yield `[2, 3, 6, 7, 10, 11]` so will see batches `[2, 3]`, `[6, 7]`, `[10, 11]`
- <Tip warning={true}>
- If your IterableDataset implements some randomization that needs to be applied the same way on all processes
- (for instance, a shuffling), you should use a `torch.Generator` in a `generator` attribute of the `dataset` to
- generate your random numbers and call the [`~trainer_pt_utils.IterableDatasetShard.set_epoch`] method of this
- object. It will set the seed of this `generator` to `seed + epoch` on all processes before starting the
- iteration. Alternatively, you can also implement a `set_epoch()` method in your iterable dataset to deal with
- this.
- </Tip>
- Args:
- dataset (`torch.utils.data.IterableDataset`):
- The batch sampler to split in several shards.
- batch_size (`int`, *optional*, defaults to 1):
- The size of the batches per shard.
- drop_last (`bool`, *optional*, defaults to `False`):
- Whether or not to drop the last incomplete batch or complete the last batches by using the samples from the
- beginning.
- num_processes (`int`, *optional*, defaults to 1):
- The number of processes running concurrently.
- process_index (`int`, *optional*, defaults to 0):
- The index of the current process.
- seed (`int`, *optional*, defaults to 0):
- A random seed that will be used for the random number generation in
- [`~trainer_pt_utils.IterableDatasetShard.set_epoch`].
- """
- def __init__(
- self,
- dataset: IterableDataset,
- batch_size: int = 1,
- drop_last: bool = False,
- num_processes: int = 1,
- process_index: int = 0,
- seed: int = 0,
- ):
- self.dataset = dataset
- self.batch_size = batch_size
- self.drop_last = drop_last
- self.num_processes = num_processes
- self.process_index = process_index
- self.seed = seed
- self.epoch = 0
- self.num_examples = 0
- def set_epoch(self, epoch):
- self.epoch = epoch
- if hasattr(self.dataset, "set_epoch"):
- self.dataset.set_epoch(epoch)
- def __iter__(self):
- self.num_examples = 0
- if (
- not hasattr(self.dataset, "set_epoch")
- and hasattr(self.dataset, "generator")
- and isinstance(self.dataset.generator, torch.Generator)
- ):
- self.dataset.generator.manual_seed(self.seed + self.epoch)
- real_batch_size = self.batch_size * self.num_processes
- process_slice = range(self.process_index * self.batch_size, (self.process_index + 1) * self.batch_size)
- first_batch = None
- current_batch = []
- for element in self.dataset:
- self.num_examples += 1
- current_batch.append(element)
- # Wait to have a full batch before yielding elements.
- if len(current_batch) == real_batch_size:
- for i in process_slice:
- yield current_batch[i]
- if first_batch is None:
- first_batch = current_batch.copy()
- current_batch = []
- # Finished if drop_last is True, otherwise complete the last batch with elements from the beginning.
- if not self.drop_last and len(current_batch) > 0:
- if first_batch is None:
- first_batch = current_batch.copy()
- while len(current_batch) < real_batch_size:
- current_batch += first_batch
- for i in process_slice:
- yield current_batch[i]
- def __len__(self):
- # Will raise an error if the underlying dataset is not sized.
- if self.drop_last:
- return (len(self.dataset) // (self.batch_size * self.num_processes)) * self.batch_size
- else:
- return math.ceil(len(self.dataset) / (self.batch_size * self.num_processes)) * self.batch_size
- def _secs2timedelta(secs):
- """
- Convert seconds to hh:mm:ss.msec, msecs rounded to 2 decimal places.
- """
- msec = int(abs(secs - int(secs)) * 100)
- return f"{datetime.timedelta(seconds=int(secs))}.{msec:02d}"
- def metrics_format(metrics: dict[str, float]) -> dict[str, float]:
- """
- Reformat Trainer metrics values to a human-readable format.
- Args:
- metrics (`dict[str, float]`):
- The metrics returned from train/evaluate/predict
- Returns:
- metrics (`dict[str, float]`): The reformatted metrics
- """
- metrics_copy = metrics.copy()
- for k, v in metrics_copy.items():
- if "_mem_" in k:
- metrics_copy[k] = f"{v >> 20}MB"
- elif "_runtime" in k:
- metrics_copy[k] = _secs2timedelta(v)
- elif k == "total_flos":
- metrics_copy[k] = f"{int(v) >> 30}GF"
- elif isinstance(metrics_copy[k], float):
- metrics_copy[k] = round(v, 4)
- return metrics_copy
- # Trainer helper method: imported into the Trainer class and used as a method (takes `self` as first argument).
- def log_metrics(self, split, metrics):
- """
- Log metrics in a specially formatted way.
- Under distributed environment this is done only for a process with rank 0.
- Args:
- split (`str`):
- Mode/split name: one of `train`, `eval`, `test`
- metrics (`dict[str, float]`):
- The metrics returned from train/evaluate/predictmetrics: metrics dict
- Notes on memory reports:
- In order to get memory usage report you need to install `psutil`. You can do that with `pip install psutil`.
- Now when this method is run, you will see a report that will include:
- ```
- init_mem_cpu_alloc_delta = 1301MB
- init_mem_cpu_peaked_delta = 154MB
- init_mem_gpu_alloc_delta = 230MB
- init_mem_gpu_peaked_delta = 0MB
- train_mem_cpu_alloc_delta = 1345MB
- train_mem_cpu_peaked_delta = 0MB
- train_mem_gpu_alloc_delta = 693MB
- train_mem_gpu_peaked_delta = 7MB
- ```
- **Understanding the reports:**
- - the first segment, e.g., `train__`, tells you which stage the metrics are for. Reports starting with `init_`
- will be added to the first stage that gets run. So that if only evaluation is run, the memory usage for the
- `__init__` will be reported along with the `eval_` metrics.
- - the third segment, is either `cpu` or `gpu`, tells you whether it's the general RAM or the gpu0 memory
- metric.
- - `*_alloc_delta` - is the difference in the used/allocated memory counter between the end and the start of the
- stage - it can be negative if a function released more memory than it allocated.
- - `*_peaked_delta` - is any extra memory that was consumed and then freed - relative to the current allocated
- memory counter - it is never negative. When you look at the metrics of any stage you add up `alloc_delta` +
- `peaked_delta` and you know how much memory was needed to complete that stage.
- The reporting happens only for process of rank 0 and gpu 0 (if there is a gpu). Typically this is enough since the
- main process does the bulk of work, but it could be not quite so if model parallel is used and then other GPUs may
- use a different amount of gpu memory. This is also not the same under DataParallel where gpu0 may require much more
- memory than the rest since it stores the gradient and optimizer states for all participating GPUs. Perhaps in the
- future these reports will evolve to measure those too.
- The CPU RAM metric measures RSS (Resident Set Size) includes both the memory which is unique to the process and the
- memory shared with other processes. It is important to note that it does not include swapped out memory, so the
- reports could be imprecise.
- The CPU peak memory is measured using a sampling thread. Due to python's GIL it may miss some of the peak memory if
- that thread didn't get a chance to run when the highest memory was used. Therefore this report can be less than
- reality. Using `tracemalloc` would have reported the exact peak memory, but it doesn't report memory allocations
- outside of python. So if some C++ CUDA extension allocated its own memory it won't be reported. And therefore it
- was dropped in favor of the memory sampling approach, which reads the current process memory usage.
- The GPU allocated and peak memory reporting is done with `torch.cuda.memory_allocated()` and
- `torch.cuda.max_memory_allocated()`. This metric reports only "deltas" for pytorch-specific allocations, as
- `torch.cuda` memory management system doesn't track any memory allocated outside of pytorch. For example, the very
- first cuda call typically loads CUDA kernels, which may take from 0.5 to 2GB of GPU memory.
- Note that this tracker doesn't account for memory allocations outside of [`Trainer`]'s `__init__`, `train`,
- `evaluate` and `predict` calls.
- Because `evaluation` calls may happen during `train`, we can't handle nested invocations because
- `torch.cuda.max_memory_allocated` is a single counter, so if it gets reset by a nested eval call, `train`'s tracker
- will report incorrect info. If this [pytorch issue](https://github.com/pytorch/pytorch/issues/16266) gets resolved
- it will be possible to change this class to be re-entrant. Until then we will only track the outer level of
- `train`, `evaluate` and `predict` methods. Which means that if `eval` is called during `train`, it's the latter
- that will account for its memory usage and that of the former.
- This also means that if any other tool that is used along the [`Trainer`] calls
- `torch.cuda.reset_peak_memory_stats`, the gpu peak memory stats could be invalid. And the [`Trainer`] will disrupt
- the normal behavior of any such tools that rely on calling `torch.cuda.reset_peak_memory_stats` themselves.
- For best performance you may want to consider turning the memory profiling off for production runs.
- """
- if not self.is_world_process_zero():
- return
- print(f"***** {split} metrics *****")
- metrics_formatted = metrics_format(metrics)
- k_width = max(len(str(x)) for x in metrics_formatted)
- v_width = max(len(str(x)) for x in metrics_formatted.values())
- for key in sorted(metrics_formatted.keys()):
- print(f" {key: <{k_width}} = {metrics_formatted[key]:>{v_width}}")
- # Trainer helper method
- def save_metrics(self, split, metrics, combined=True):
- """
- Save metrics into a json file for that split, e.g. `train_results.json`.
- Under distributed environment this is done only for a process with rank 0.
- Args:
- split (`str`):
- Mode/split name: one of `train`, `eval`, `test`, `all`
- metrics (`dict[str, float]`):
- The metrics returned from train/evaluate/predict
- combined (`bool`, *optional*, defaults to `True`):
- Creates combined metrics by updating `all_results.json` with metrics of this call
- To understand the metrics please read the docstring of [`~Trainer.log_metrics`]. The only difference is that raw
- unformatted numbers are saved in the current method.
- """
- if not self.is_world_process_zero():
- return
- path = os.path.join(self.args.output_dir, f"{split}_results.json")
- with open(path, "w") as f:
- json.dump(metrics, f, indent=4, sort_keys=True)
- if combined:
- path = os.path.join(self.args.output_dir, "all_results.json")
- if os.path.exists(path):
- with open(path) as f:
- all_metrics = json.load(f)
- else:
- all_metrics = {}
- all_metrics.update(metrics)
- with open(path, "w") as f:
- json.dump(all_metrics, f, indent=4, sort_keys=True)
- # Trainer helper method
- def save_state(self):
- """
- Saves the Trainer state, since Trainer.save_model saves only the tokenizer with the model.
- Under distributed environment this is done only for a process with rank 0.
- """
- if not self.is_world_process_zero():
- return
- path = os.path.join(self.args.output_dir, "trainer_state.json")
- self.state.save_to_json(path)
- # Trainer helper method
- def get_num_trainable_parameters(self) -> int:
- """
- Get the number of trainable parameters.
- """
- return sum(p.numel() for p in self.model.parameters() if p.requires_grad)
- # Trainer helper method
- def get_learning_rates(self) -> list[float]:
- """
- Returns the learning rate of each parameter from self.optimizer.
- """
- if self.optimizer is None:
- raise ValueError("Trainer optimizer is None, please make sure you have setup the optimizer before.")
- return [group["lr"] for group in self.optimizer.param_groups]
- # Trainer helper method
- def get_optimizer_group(self, param: str | torch.nn.parameter.Parameter | None = None):
- """
- Returns optimizer group for a parameter if given, else returns all optimizer groups for params.
- Args:
- param (`str` or `torch.nn.parameter.Parameter`, *optional*):
- The parameter for which optimizer group needs to be returned.
- """
- if self.optimizer is None:
- raise ValueError("Trainer optimizer is None, please make sure you have setup the optimizer before.")
- if param is not None:
- for group in self.optimizer.param_groups:
- if param in group["params"]:
- return group
- return [group["params"] for group in self.optimizer.param_groups]
- def get_model_param_count(model, trainable_only=False):
- """
- Calculate model's total param count. If trainable_only is True then count only those requiring grads.
- """
- if is_deepspeed_zero3_enabled():
- def numel(p):
- return p.ds_numel if hasattr(p, "ds_numel") else p.numel()
- else:
- def numel(p):
- return p.numel()
- return sum(numel(p) for p in model.parameters() if not trainable_only or p.requires_grad)
- def get_parameter_names(model, forbidden_layer_types, forbidden_layer_names=None):
- """
- Returns the names of the model parameters that are not inside a forbidden layer.
- """
- forbidden_layer_patterns = (
- [re.compile(pattern) for pattern in forbidden_layer_names] if forbidden_layer_names is not None else []
- )
- result = []
- for name, child in model.named_children():
- child_params = get_parameter_names(child, forbidden_layer_types, forbidden_layer_names)
- result += [
- f"{name}.{n}"
- for n in child_params
- if not isinstance(child, tuple(forbidden_layer_types))
- and not any(pattern.search(f"{name}.{n}".lower()) for pattern in forbidden_layer_patterns)
- ]
- # Add model specific parameters that are not in any child
- result += [
- k for k in model._parameters if not any(pattern.search(k.lower()) for pattern in forbidden_layer_patterns)
- ]
- return result
- def get_module_class_from_name(module, name):
- """
- Gets a class from a module by its name.
- Args:
- module (`torch.nn.Module`): The module to get the class from.
- name (`str`): The name of the class.
- """
- modules_children = list(module.children())
- if module.__class__.__name__ == name:
- return module.__class__
- elif len(modules_children) == 0:
- return
- else:
- for child_module in modules_children:
- module_class = get_module_class_from_name(child_module, name)
- if module_class is not None:
- return module_class
- def remove_dummy_checkpoint(is_main_process, output_dir, filenames):
- if is_main_process:
- for filename in filenames:
- file = os.path.join(output_dir, filename)
- if os.path.isfile(file):
- os.remove(file)
- if is_sagemaker_mp_enabled():
- import smdistributed.modelparallel.torch as smp
- @smp.step()
- def smp_forward_backward(model, inputs, gradient_accumulation_steps=1):
- outputs = model(**inputs)
- loss = outputs["loss"] if isinstance(outputs, dict) else outputs[0]
- loss /= gradient_accumulation_steps
- model.backward(loss)
- return loss
- @smp.step()
- def smp_forward_only(model, inputs):
- return model(**inputs)
- def smp_gather(tensor):
- if isinstance(tensor, (list, tuple)):
- return type(tensor)(smp_gather(t) for t in tensor)
- elif isinstance(tensor, dict):
- return type(tensor)({k: smp_gather(v) for k, v in tensor.items()})
- elif not isinstance(tensor, torch.Tensor):
- raise TypeError(
- f"Can't gather the values of type {type(tensor)}, only of nested list/tuple/dicts of tensors."
- )
- all_tensors = smp.allgather(tensor, smp.CommGroup.DP_GROUP)
- all_tensors = [atleast_1d(t) for t in all_tensors]
- return torch.cat([t.cpu() for t in all_tensors], dim=0)
- def smp_nested_concat(tensor):
- if isinstance(tensor, (list, tuple)):
- return type(tensor)(smp_nested_concat(t) for t in tensor)
- elif isinstance(tensor, dict):
- return type(tensor)({k: smp_nested_concat(v) for k, v in tensor.items()})
- # It doesn't seem possible to check here if `tensor` is a StepOutput because StepOutput lives in `smp.step`
- # which is also the name of the decorator so Python is confused.
- return tensor.detach().concat().cpu()
- @dataclass
- class AcceleratorConfig:
- """
- A subset of arguments relating to the underlying [`accelerate.Accelerator`]
- implementation utilized in the `Trainer` that can be customized.
- Mostly relating to data.
- Parameters:
- split_batches (`bool`, *optional*, defaults to `False`):
- Whether or not the accelerator should split the batches yielded by the dataloaders across the devices. If
- `True` the actual batch size used will be the same on any kind of distributed processes, but it must be a
- round multiple of the `num_processes` you are using. If `False`, actual batch size used will be the one set
- in your script multiplied by the number of processes.
- dispatch_batches (`bool`, *optional*):
- If set to `True`, the dataloader prepared by the Accelerator is only iterated through on the main process
- and then the batches are split and broadcast to each process. Will default to `True` for `DataLoader` whose
- underlying dataset is an `IterableDataset`, `False` otherwise.
- even_batches (`bool`, *optional*, defaults to `True`):
- If set to `True`, in cases where the total batch size across all processes does not exactly divide the
- dataset, samples at the start of the dataset will be duplicated so the batch can be divided equally among
- all workers.
- use_seedable_sampler (`bool`, *optional*, defaults to `True`):
- Whether or not use a fully seedable random sampler ([`accelerate.data_loader.SeedableRandomSampler`]). Ensures
- training results are fully reproducible using a different sampling technique. While seed-to-seed results
- may differ, on average the differences are negligible when using multiple different seeds to compare. Should
- also be ran with [`~utils.set_seed`] for the best results.
- gradient_accumulation_kwargs (`dict`, *optional*):
- Additional kwargs to configure gradient accumulation, see [`accelerate.utils.GradientAccumulationPlugin`].
- Any of the following (optional) keys are acceptable:
- num_steps (`int`): Will take precedence over [`~.TrainingArguments.gradient_accumulation_steps`] if
- the latter is set to 1, otherwise an exception will be raised.
- sync_each_batch (`bool`): Whether to synchronize the gradients at each data batch.
- The [`accelerate.utils.GradientAccumulationPlugin`] default is `False`.
- non_blocking (`bool`, *optional*, defaults to `False`):
- Whether to use non-blocking CUDA calls to help minimize synchronization during
- distributed training with prepared `DataLoader` inputs being moved to device.
- Best if used with `pin_memory=True` in the `TrainingArguments`.
- use_configured_state (`bool*, *optional*, defaults to `False`):
- Whether or not to use a pre-configured `AcceleratorState` or `PartialState` defined
- before calling `TrainingArguments`. If `True`, an `Accelerator` or `PartialState`
- must be initialized. May lead to issues using sweeps or hyperparameter tuning.
- """
- # Data related arguments
- split_batches: bool = field(
- default=False,
- metadata={
- "help": "Whether or not the accelerator should split the batches yielded by the dataloaders across the devices. If"
- " `True` the actual batch size used will be the same on any kind of distributed processes, but it must be a"
- " round multiple of the `num_processes` you are using. If `False`, actual batch size used will be the one set"
- " in your script multiplied by the number of processes."
- },
- )
- dispatch_batches: bool | None = field(
- default=None,
- metadata={
- "help": "If set to `True`, the dataloader prepared by the Accelerator is only iterated through on the main process"
- " and then the batches are split and broadcast to each process. Will default to `True` for `DataLoader` whose"
- " underlying dataset is an `IterableDataslet`, `False` otherwise."
- },
- )
- even_batches: bool = field(
- default=True,
- metadata={
- "help": "If set to `True`, in cases where the total batch size across all processes does not exactly divide the"
- " dataset, samples at the start of the dataset will be duplicated so the batch can be divided equally among"
- " all workers."
- },
- )
- use_seedable_sampler: bool = field(
- default=True,
- metadata={
- "help": "Whether or not use a fully seedable random sampler ([`accelerate.data_loader.SeedableRandomSampler`])."
- "Ensures training results are fully reproducible using a different sampling technique. "
- "While seed-to-seed results may differ, on average the differences are negligible when using"
- "multiple different seeds to compare. Should also be ran with [`~utils.set_seed`] for the best results."
- },
- )
- non_blocking: bool = field(
- default=False,
- metadata={
- "help": "Whether to use non-blocking CUDA calls to help minimize synchronization during "
- "distributed training with prepared `DataLoader` inputs being moved to device. "
- "Best if used with `pin_memory=True` in the `TrainingArguments`. Requires accelerate "
- "v0.30.0."
- },
- )
- gradient_accumulation_kwargs: dict | None = field(
- default=None,
- metadata={
- "help": "Additional kwargs to configure gradient accumulation, see [`accelerate.utils.GradientAccumulationPlugin`]. "
- "Any of the following (optional) keys are acceptable: "
- " num_steps (`int`): Will take precedence over [`~.TrainingArguments.gradient_accumulation_steps`] if "
- " the latter is set to 1, otherwise an exception will be raised. "
- " sync_each_batch (`bool`): Whether to synchronize the gradients at each data batch. "
- " The [`accelerate.utils.GradientAccumulationPlugin`] default is `False`."
- },
- )
- use_configured_state: bool = field(
- default=False,
- metadata={
- "help": "Whether or not to use a pre-configured `AcceleratorState` or `PartialState` defined before calling `TrainingArguments`."
- "If `True`, an `Accelerator` or `PartialState` must be initialized. May lead to issues using sweeps or hyperparameter tuning."
- },
- )
- @classmethod
- def from_json_file(cls, json_file):
- # Check if exists
- open_file = io.open if os.path.exists(json_file) else open
- with open_file(json_file, "r", encoding="utf-8") as f:
- config_dict = json.load(f)
- # Check for keys and load sensible defaults
- extra_keys = sorted(key for key in config_dict if key not in cls.__dataclass_fields__)
- if len(extra_keys) > 0:
- raise ValueError(
- f"The config file at {json_file} had unknown keys ({extra_keys}), please try upgrading your `transformers`"
- " version or fix (and potentially remove these keys) from your config file."
- )
- return cls(**config_dict)
- def to_dict(self):
- return copy.deepcopy(self.__dict__)
- def pop(self, key, default=None):
- return self.__dict__.pop(key, default)
- class LayerWiseDummyOptimizer(torch.optim.Optimizer):
- """
- For Layer-wise optimizers such as GaLoRE optimizer, the optimization
- step is already done through the post gradient hooks. Therefore
- the trick is to create a dummy optimizer that can take arbitrary
- args and kwargs and return a no-op during training.
- Initial idea from @hiyouga in LLaMA-Factory:
- https://github.com/hiyouga/LLaMA-Factory/commit/8664262cde3919e10eaecbd66e8c5d356856362e#diff-ebe08ab14496dfb9e06075f0fdd36799ef6d1535cc4dd4715b74c4e3e06fe3ba
- """
- def __init__(self, optimizer_dict=None, **kwargs):
- dummy_tensor = torch.randn(1, 1)
- self.optimizer_dict = optimizer_dict
- super().__init__([dummy_tensor], {"lr": kwargs.get("lr", 1e-03)})
- def zero_grad(self, set_to_none: bool = True) -> None:
- pass
- def step(self, closure=None) -> float | None:
- pass
- class LayerWiseDummyScheduler(LRScheduler):
- """
- For Layer-wise optimizers such as GaLoRE optimizer, the optimization and scheduling step
- are already done through the post gradient hooks. Therefore
- the trick is to create a dummy scheduler that can take arbitrary
- args and kwargs and return a no-op during training.
- """
- def __init__(self, *args, **kwargs):
- self.default_lr = kwargs["lr"]
- optimizer = LayerWiseDummyOptimizer(**kwargs)
- last_epoch = -1
- super().__init__(optimizer, last_epoch)
- def get_lr(self):
- # default value
- lrs = [self.default_lr]
- # we take each lr in the parameters if they exist, assumes the optimizer to be the `LayerWiseDummyOptimizer`
- if self.optimizer is not None:
- param_wise_lrs = [
- [group["lr"] for group in optim.param_groups] for optim in self.optimizer.optimizer_dict.values()
- ]
- lrs = list(chain(*param_wise_lrs))
- return lrs
- def _get_closed_form_lr(self):
- return self.base_lrs
- def set_rng_state_for_device(device_name, device_module, checkpoint_rng_state, is_distributed):
- """Helper to set RNG state for a specific device type (CUDA, NPU, MLU, MUSA)"""
- device_state_key = device_name.lower()
- err_template = "Didn't manage to set back the RNG states of the {backend} because of the following error:\n {exception}\nThis won't yield the same results as if the training had not been interrupted."
- try:
- if is_distributed:
- device_module.random.set_rng_state_all(checkpoint_rng_state[device_state_key])
- else:
- device_module.random.set_rng_state(checkpoint_rng_state[device_state_key])
- except Exception as e:
- # Log error if setting RNG state fails
- logger.error(err_template.format(backend=device_name, exception=e))
- def safe_globals():
- """
- Context manager to allowlist numpy objects for torch.load with weights_only=True.
- Starting from version 2.4 PyTorch introduces a check for the objects loaded
- with torch.load(weights_only=True). Starting from 2.6 weights_only=True becomes
- a default and requires allowlisting of objects being loaded.
- See: https://github.com/pytorch/pytorch/pull/137602
- See: https://pytorch.org/docs/stable/notes/serialization.html#torch.serialization.add_safe_globals
- See: https://github.com/huggingface/accelerate/pull/3036
- """
- if version.parse(torch.__version__).release < version.parse("2.6").release:
- return contextlib.nullcontext()
- np_core = np._core if version.parse(np.__version__) >= version.parse("2.0.0") else np.core
- allowlist = [np_core.multiarray._reconstruct, np.ndarray, np.dtype]
- # numpy >1.25 defines numpy.dtypes.UInt32DType, but below works for
- # all versions of numpy
- allowlist += [type(np.dtype(np.uint32))]
- return torch.serialization.safe_globals(allowlist)
|