| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590 |
- import math
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- from ... import initialization as init
- from ...integrations.deepspeed import is_deepspeed_zero3_enabled
- from ...integrations.fsdp import is_fsdp_managed_module
- from ...modeling_layers import GradientCheckpointingLayer
- from ...modeling_outputs import BaseModelOutput, Wav2Vec2BaseModelOutput
- from ...modeling_utils import PreTrainedModel
- from ...utils import logging
- from ..wav2vec2.modeling_wav2vec2 import (
- Wav2Vec2FeatureProjection,
- Wav2Vec2FeedForward,
- Wav2Vec2ForAudioFrameClassification,
- Wav2Vec2ForCTC,
- Wav2Vec2ForSequenceClassification,
- Wav2Vec2ForXVector,
- Wav2Vec2Model,
- Wav2Vec2PositionalConvEmbedding,
- Wav2Vec2PreTrainedModel,
- )
- from .configuration_wavlm import WavLMConfig
- logger = logging.get_logger(__name__)
- class WavLMPositionalConvEmbedding(Wav2Vec2PositionalConvEmbedding):
- pass
- class WavLMFeatureProjection(Wav2Vec2FeatureProjection):
- pass
- class WavLMAttention(nn.Module):
- """Multi-headed attention from 'Attention Is All You Need' paper"""
- def __init__(
- self,
- embed_dim: int,
- num_heads: int,
- dropout: float | int = 0.0,
- num_buckets: int = 320,
- max_distance: int = 800,
- has_relative_position_bias: bool = True,
- ):
- super().__init__()
- self.embed_dim = embed_dim
- self.num_heads = num_heads
- self.dropout = dropout
- self.head_dim = embed_dim // num_heads
- if (self.head_dim * num_heads) != self.embed_dim:
- raise ValueError(
- f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim}"
- f" and `num_heads`: {num_heads})."
- )
- self.scaling = self.head_dim**-0.5
- self.k_proj = nn.Linear(embed_dim, embed_dim)
- self.v_proj = nn.Linear(embed_dim, embed_dim)
- self.q_proj = nn.Linear(embed_dim, embed_dim)
- self.out_proj = nn.Linear(embed_dim, embed_dim)
- self.num_buckets = num_buckets
- self.max_distance = max_distance
- self.gru_rel_pos_const = nn.Parameter(torch.ones(1, self.num_heads, 1, 1))
- self.gru_rel_pos_linear = nn.Linear(self.head_dim, 8)
- if has_relative_position_bias:
- self.rel_attn_embed = nn.Embedding(self.num_buckets, self.num_heads)
- def forward(
- self,
- hidden_states: torch.Tensor,
- attention_mask: torch.Tensor | None = None,
- position_bias: torch.Tensor | None = None,
- output_attentions: bool = False,
- index=0,
- ) -> tuple[torch.Tensor, torch.Tensor | None, tuple[torch.Tensor] | None]:
- """Attention layer with relative attention"""
- bsz, tgt_len, _ = hidden_states.size()
- # first pass of attention layer creates position bias
- if position_bias is None:
- position_bias = self.compute_bias(tgt_len, tgt_len)
- position_bias = (
- position_bias.unsqueeze(0).repeat(bsz, 1, 1, 1).view(bsz * self.num_heads, tgt_len, tgt_len)
- )
- # Compute relative position bias:
- # 1) get reshape hidden_states
- gated_hidden_states = hidden_states.view(hidden_states.shape[:-1] + (self.num_heads, -1))
- gated_hidden_states = gated_hidden_states.permute(0, 2, 1, 3)
- # 2) project hidden states
- relative_position_proj = self.gru_rel_pos_linear(gated_hidden_states)
- relative_position_proj = relative_position_proj.view(gated_hidden_states.shape[:-1] + (2, 4)).sum(-1)
- # 3) compute gate for position bias from projected hidden states
- gate_a, gate_b = torch.sigmoid(relative_position_proj).chunk(2, dim=-1)
- gate_output = gate_a * (gate_b * self.gru_rel_pos_const - 1.0) + 2.0
- # 4) apply gate to position bias to compute gated position_bias
- gated_position_bias = gate_output.view(bsz * self.num_heads, -1, 1) * position_bias
- gated_position_bias = gated_position_bias.view((-1, tgt_len, tgt_len))
- attn_output, attn_weights = self.torch_multi_head_self_attention(
- hidden_states, attention_mask, gated_position_bias, output_attentions
- )
- return attn_output, attn_weights, position_bias
- def torch_multi_head_self_attention(
- self,
- hidden_states: torch.FloatTensor,
- attention_mask: torch.LongTensor | torch.BoolTensor,
- gated_position_bias: torch.FloatTensor,
- output_attentions: bool,
- ) -> tuple[torch.FloatTensor, torch.FloatTensor]:
- """simple wrapper around torch's multi_head_attention_forward function"""
- # self-attention assumes q = k = v
- query = key = value = hidden_states.transpose(0, 1)
- key_padding_mask = attention_mask.ne(1) if attention_mask is not None else None
- # disable bias and add_zero_attn
- bias_k = bias_v = None
- add_zero_attn = False
- # PyTorch 1.3.0 has F.multi_head_attention_forward defined
- # so no problem with backwards compatibility
- attn_output, attn_weights = F.multi_head_attention_forward(
- query,
- key,
- value,
- self.embed_dim,
- self.num_heads,
- torch.empty([0]),
- torch.cat((self.q_proj.bias, self.k_proj.bias, self.v_proj.bias)),
- bias_k,
- bias_v,
- add_zero_attn,
- self.dropout,
- self.out_proj.weight,
- self.out_proj.bias,
- self.training,
- key_padding_mask,
- output_attentions,
- gated_position_bias,
- use_separate_proj_weight=True,
- q_proj_weight=self.q_proj.weight,
- k_proj_weight=self.k_proj.weight,
- v_proj_weight=self.v_proj.weight,
- )
- # [Seq_Len, Batch Size, ...] -> [Batch Size, Seq_Len, ...]
- attn_output = attn_output.transpose(0, 1)
- if attn_weights is not None:
- # IMPORTANT: Attention weights are averaged weights
- # here which should not be the case. This is an open issue
- # on PyTorch: https://github.com/pytorch/pytorch/issues/32590
- attn_weights = attn_weights[:, None].broadcast_to(
- attn_weights.shape[:1] + (self.num_heads,) + attn_weights.shape[1:]
- )
- return attn_output, attn_weights
- def compute_bias(self, query_length: int, key_length: int) -> torch.FloatTensor:
- context_position = torch.arange(query_length, dtype=torch.long)[:, None]
- memory_position = torch.arange(key_length, dtype=torch.long)[None, :]
- relative_position = memory_position - context_position
- relative_position_bucket = self._relative_positions_bucket(relative_position)
- relative_position_bucket = relative_position_bucket.to(self.rel_attn_embed.weight.device)
- values = self.rel_attn_embed(relative_position_bucket)
- values = values.permute([2, 0, 1])
- return values
- def _relative_positions_bucket(self, relative_positions: torch.FloatTensor) -> torch.FloatTensor:
- num_buckets = self.num_buckets // 2
- relative_buckets = (relative_positions > 0).to(torch.long) * num_buckets
- relative_positions = torch.abs(relative_positions)
- max_exact = num_buckets // 2
- is_small = relative_positions < max_exact
- relative_positions_if_large = torch.log(relative_positions.float() / max_exact)
- relative_positions_if_large = relative_positions_if_large / math.log(self.max_distance / max_exact)
- relative_positions_if_large = relative_positions_if_large * (num_buckets - max_exact)
- relative_position_if_large = (max_exact + relative_positions_if_large).to(torch.long)
- relative_position_if_large = torch.min(
- relative_position_if_large, torch.full_like(relative_position_if_large, num_buckets - 1)
- )
- relative_buckets += torch.where(is_small, relative_positions, relative_position_if_large)
- return relative_buckets
- class WavLMFeedForward(Wav2Vec2FeedForward):
- pass
- class WavLMEncoderLayer(GradientCheckpointingLayer):
- def __init__(self, config: WavLMConfig, has_relative_position_bias: bool = True):
- super().__init__()
- self.attention = WavLMAttention(
- embed_dim=config.hidden_size,
- num_heads=config.num_attention_heads,
- dropout=config.attention_dropout,
- num_buckets=config.num_buckets,
- max_distance=config.max_bucket_distance,
- has_relative_position_bias=has_relative_position_bias,
- )
- self.dropout = nn.Dropout(config.hidden_dropout)
- self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- self.feed_forward = WavLMFeedForward(config)
- self.final_layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- def forward(self, hidden_states, attention_mask=None, position_bias=None, output_attentions=False, index=0):
- attn_residual = hidden_states
- hidden_states, attn_weights, position_bias = self.attention(
- hidden_states,
- attention_mask=attention_mask,
- position_bias=position_bias,
- output_attentions=output_attentions,
- index=index,
- )
- hidden_states = self.dropout(hidden_states)
- hidden_states = attn_residual + hidden_states
- hidden_states = self.layer_norm(hidden_states)
- hidden_states = hidden_states + self.feed_forward(hidden_states)
- hidden_states = self.final_layer_norm(hidden_states)
- outputs = (hidden_states, position_bias)
- if output_attentions:
- outputs += (attn_weights,)
- return outputs
- class WavLMEncoderLayerStableLayerNorm(GradientCheckpointingLayer):
- def __init__(self, config: WavLMConfig, has_relative_position_bias: bool = True):
- super().__init__()
- self.attention = WavLMAttention(
- embed_dim=config.hidden_size,
- num_heads=config.num_attention_heads,
- dropout=config.attention_dropout,
- num_buckets=config.num_buckets,
- max_distance=config.max_bucket_distance,
- has_relative_position_bias=has_relative_position_bias,
- )
- self.dropout = nn.Dropout(config.hidden_dropout)
- self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- self.feed_forward = WavLMFeedForward(config)
- self.final_layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- def forward(self, hidden_states, attention_mask=None, position_bias=None, output_attentions=False):
- attn_residual = hidden_states
- hidden_states = self.layer_norm(hidden_states)
- hidden_states, attn_weights, position_bias = self.attention(
- hidden_states,
- attention_mask=attention_mask,
- position_bias=position_bias,
- output_attentions=output_attentions,
- )
- hidden_states = self.dropout(hidden_states)
- hidden_states = attn_residual + hidden_states
- hidden_states = hidden_states + self.feed_forward(self.final_layer_norm(hidden_states))
- outputs = (hidden_states, position_bias)
- if output_attentions:
- outputs += (attn_weights,)
- return outputs
- class WavLMEncoder(nn.Module):
- def __init__(self, config):
- super().__init__()
- self.config = config
- self.pos_conv_embed = WavLMPositionalConvEmbedding(config)
- self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- self.dropout = nn.Dropout(config.hidden_dropout)
- self.layers = nn.ModuleList(
- [WavLMEncoderLayer(config, has_relative_position_bias=(i == 0)) for i in range(config.num_hidden_layers)]
- )
- self.gradient_checkpointing = False
- def forward(
- self,
- hidden_states,
- attention_mask=None,
- output_attentions=False,
- output_hidden_states=False,
- return_dict=True,
- ):
- all_hidden_states = () if output_hidden_states else None
- all_self_attentions = () if output_attentions else None
- if attention_mask is not None:
- # make sure padded tokens output 0
- expand_attention_mask = attention_mask.unsqueeze(-1).repeat(1, 1, hidden_states.shape[2])
- hidden_states[~expand_attention_mask] = 0
- position_embeddings = self.pos_conv_embed(hidden_states)
- hidden_states = hidden_states + position_embeddings
- hidden_states = self.layer_norm(hidden_states)
- hidden_states = self.dropout(hidden_states)
- synced_gpus = is_deepspeed_zero3_enabled() or is_fsdp_managed_module(self)
- position_bias = None
- for i, layer in enumerate(self.layers):
- if output_hidden_states:
- all_hidden_states = all_hidden_states + (hidden_states,)
- # add LayerDrop (see https://huggingface.co/papers/1909.11556 for description)
- dropout_probability = torch.rand([])
- skip_the_layer = self.training and i > 0 and (dropout_probability < self.config.layerdrop)
- if not skip_the_layer or synced_gpus:
- # under fsdp or deepspeed zero3 all gpus must run in sync
- layer_outputs = layer(
- hidden_states,
- attention_mask=attention_mask,
- position_bias=position_bias,
- output_attentions=output_attentions,
- index=i,
- )
- hidden_states, position_bias = layer_outputs[:2]
- if skip_the_layer:
- layer_outputs = (None, None, None)
- if output_attentions:
- all_self_attentions = all_self_attentions + (layer_outputs[2],)
- if output_hidden_states:
- all_hidden_states = all_hidden_states + (hidden_states,)
- if not return_dict:
- return tuple(v for v in [hidden_states, all_hidden_states, all_self_attentions] if v is not None)
- return BaseModelOutput(
- last_hidden_state=hidden_states,
- hidden_states=all_hidden_states,
- attentions=all_self_attentions,
- )
- class WavLMEncoderStableLayerNorm(nn.Module):
- def __init__(self, config):
- super().__init__()
- self.config = config
- self.pos_conv_embed = WavLMPositionalConvEmbedding(config)
- self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- self.dropout = nn.Dropout(config.hidden_dropout)
- self.layers = nn.ModuleList(
- [
- WavLMEncoderLayerStableLayerNorm(config, has_relative_position_bias=(i == 0))
- for i in range(config.num_hidden_layers)
- ]
- )
- self.gradient_checkpointing = False
- def forward(
- self,
- hidden_states,
- attention_mask=None,
- output_attentions=False,
- output_hidden_states=False,
- return_dict=True,
- ):
- all_hidden_states = () if output_hidden_states else None
- all_self_attentions = () if output_attentions else None
- if attention_mask is not None:
- # make sure padded tokens are not attended to
- expand_attention_mask = attention_mask.unsqueeze(-1).repeat(1, 1, hidden_states.shape[2])
- hidden_states[~expand_attention_mask] = 0
- position_embeddings = self.pos_conv_embed(hidden_states)
- hidden_states = hidden_states + position_embeddings
- hidden_states = self.dropout(hidden_states)
- synced_gpus = is_deepspeed_zero3_enabled() or is_fsdp_managed_module(self)
- position_bias = None
- for i, layer in enumerate(self.layers):
- if output_hidden_states:
- all_hidden_states = all_hidden_states + (hidden_states,)
- # add LayerDrop (see https://huggingface.co/papers/1909.11556 for description)
- dropout_probability = torch.rand([])
- skip_the_layer = self.training and i > 0 and (dropout_probability < self.config.layerdrop)
- if not skip_the_layer or synced_gpus:
- # under fsdp or deepspeed zero3 all gpus must run in sync
- # XXX: could optimize this like synced_gpus in generate_utils but not sure if it's worth the code complication
- layer_outputs = layer(
- hidden_states,
- attention_mask=attention_mask,
- output_attentions=output_attentions,
- position_bias=position_bias,
- )
- hidden_states, position_bias = layer_outputs[:2]
- if skip_the_layer:
- layer_outputs = (None, None, None)
- if output_attentions:
- all_self_attentions = all_self_attentions + (layer_outputs[2],)
- hidden_states = self.layer_norm(hidden_states)
- if output_hidden_states:
- all_hidden_states = all_hidden_states + (hidden_states,)
- if not return_dict:
- return tuple(v for v in [hidden_states, all_hidden_states, all_self_attentions] if v is not None)
- return BaseModelOutput(
- last_hidden_state=hidden_states, hidden_states=all_hidden_states, attentions=all_self_attentions
- )
- class WavLMGumbelVectorQuantizer(nn.Module):
- """
- Vector quantization using gumbel softmax. See [CATEGORICAL REPARAMETERIZATION WITH
- GUMBEL-SOFTMAX](https://huggingface.co/papers/1611.01144) for more information.
- """
- def __init__(self, config):
- super().__init__()
- self.num_groups = config.num_codevector_groups
- self.num_vars = config.num_codevectors_per_group
- if config.codevector_dim % self.num_groups != 0:
- raise ValueError(
- f"`config.codevector_dim {config.codevector_dim} must be divisible"
- f" by `config.num_codevector_groups` {self.num_groups} "
- "for concatenation."
- )
- # storage for codebook variables (codewords)
- self.codevectors = nn.Parameter(
- torch.FloatTensor(1, self.num_groups * self.num_vars, config.codevector_dim // self.num_groups)
- )
- self.weight_proj = nn.Linear(config.conv_dim[-1], self.num_groups * self.num_vars)
- # can be decayed for training
- self.temperature = 2
- @staticmethod
- def _compute_perplexity(probs):
- marginal_probs = probs.mean(dim=0)
- perplexity = torch.exp(-torch.sum(torch.xlogy(marginal_probs, marginal_probs), dim=-1)).sum()
- return perplexity
- def forward(self, hidden_states):
- batch_size, sequence_length, hidden_size = hidden_states.shape
- # project to codevector dim
- hidden_states = self.weight_proj(hidden_states)
- hidden_states = hidden_states.view(batch_size * sequence_length * self.num_groups, -1)
- if self.training:
- # sample code vector probs via gumbel in differentiateable way
- codevector_probs = nn.functional.gumbel_softmax(hidden_states.float(), tau=self.temperature, hard=True)
- codevector_probs = codevector_probs.type_as(hidden_states)
- # compute perplexity
- codevector_soft_dist = torch.softmax(
- hidden_states.view(batch_size * sequence_length, self.num_groups, -1).float(), dim=-1
- )
- perplexity = self._compute_perplexity(codevector_soft_dist)
- else:
- # take argmax in non-differentiable way
- # comptute hard codevector distribution (one hot)
- codevector_idx = hidden_states.argmax(dim=-1)
- codevector_probs = hidden_states.new_zeros(*hidden_states.shape).scatter_(
- -1, codevector_idx.view(-1, 1), 1.0
- )
- codevector_probs = codevector_probs.view(batch_size * sequence_length, self.num_groups, -1)
- perplexity = self._compute_perplexity(codevector_probs)
- codevector_probs = codevector_probs.view(batch_size * sequence_length, -1)
- # use probs to retrieve codevectors
- codevectors_per_group = codevector_probs.unsqueeze(-1) * self.codevectors
- codevectors = codevectors_per_group.view(batch_size * sequence_length, self.num_groups, self.num_vars, -1)
- codevectors = codevectors.sum(-2).view(batch_size, sequence_length, -1)
- return codevectors, perplexity
- class WavLMPreTrainedModel(PreTrainedModel, Wav2Vec2PreTrainedModel):
- config: WavLMConfig
- base_model_prefix = "wavlm"
- main_input_name = "input_values"
- input_modalities = "audio"
- supports_gradient_checkpointing = True
- _supports_flash_attn = False
- _supports_sdpa = False
- _supports_flex_attn = False
- @torch.no_grad()
- def _init_weights(self, module):
- """Initialize the weights"""
- # gumbel softmax requires special init
- if isinstance(module, WavLMGumbelVectorQuantizer):
- init.normal_(module.weight_proj.weight, mean=0.0, std=1)
- init.zeros_(module.weight_proj.bias)
- init.uniform_(module.codevectors)
- elif isinstance(module, WavLMPositionalConvEmbedding):
- init.normal_(
- module.conv.weight,
- mean=0,
- std=2 * math.sqrt(1 / (module.conv.kernel_size[0] * module.conv.in_channels)),
- )
- init.constant_(module.conv.bias, 0)
- elif isinstance(module, WavLMFeatureProjection):
- k = math.sqrt(1 / module.projection.in_features)
- init.uniform_(module.projection.weight, a=-k, b=k)
- init.uniform_(module.projection.bias, a=-k, b=k)
- elif isinstance(module, nn.Linear):
- init.normal_(module.weight, mean=0.0, std=self.config.initializer_range)
- if module.bias is not None:
- init.zeros_(module.bias)
- elif isinstance(module, (nn.LayerNorm, nn.GroupNorm)):
- init.zeros_(module.bias)
- init.ones_(module.weight)
- elif isinstance(module, nn.Conv1d):
- init.kaiming_normal_(module.weight)
- if module.bias is not None:
- k = math.sqrt(module.groups / (module.in_channels * module.kernel_size[0]))
- init.uniform_(module.bias, a=-k, b=k)
- def _get_adapters(self):
- raise AttributeError("Not needed for WavLM")
- def init_adapter_layers(self):
- raise AttributeError("Not needed for WavLM")
- def load_adapter(self):
- raise AttributeError("Not needed for WavLM")
- WavLMBaseModelOutput = Wav2Vec2BaseModelOutput
- class WavLMModel(Wav2Vec2Model):
- pass
- class WavLMForCTC(Wav2Vec2ForCTC):
- pass
- class WavLMForSequenceClassification(Wav2Vec2ForSequenceClassification):
- pass
- class WavLMForAudioFrameClassification(Wav2Vec2ForAudioFrameClassification):
- pass
- class WavLMForXVector(Wav2Vec2ForXVector):
- pass
- __all__ = [
- "WavLMForAudioFrameClassification",
- "WavLMForCTC",
- "WavLMForSequenceClassification",
- "WavLMForXVector",
- "WavLMModel",
- "WavLMPreTrainedModel",
- ]
|