| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942 |
- # Copyright 2019-present, the HuggingFace Inc. team, The Google AI Language Team and Facebook, Inc.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- PyTorch DistilBERT model adapted in part from Facebook, Inc XLM model (https://github.com/facebookresearch/XLM) and in
- part from HuggingFace PyTorch version of Google AI Bert model (https://github.com/google-research/bert)
- """
- from collections.abc import Callable
- import numpy as np
- import torch
- from torch import nn
- from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
- from ... import initialization as init
- from ...activations import get_activation
- from ...configuration_utils import PreTrainedConfig
- from ...integrations.deepspeed import is_deepspeed_zero3_enabled
- from ...masking_utils import create_bidirectional_mask
- from ...modeling_layers import GradientCheckpointingLayer
- from ...modeling_outputs import (
- BaseModelOutput,
- MaskedLMOutput,
- MultipleChoiceModelOutput,
- QuestionAnsweringModelOutput,
- SequenceClassifierOutput,
- TokenClassifierOutput,
- )
- from ...modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel
- from ...processing_utils import Unpack
- from ...pytorch_utils import (
- apply_chunking_to_forward,
- )
- from ...utils import (
- TransformersKwargs,
- auto_docstring,
- logging,
- )
- from ...utils.deprecation import deprecate_kwarg
- from ...utils.generic import can_return_tuple, merge_with_config_defaults
- from ...utils.output_capturing import capture_outputs
- from .configuration_distilbert import DistilBertConfig
- logger = logging.get_logger(__name__)
- # UTILS AND BUILDING BLOCKS OF THE ARCHITECTURE #
- def create_sinusoidal_embeddings(n_pos: int, dim: int, out: torch.Tensor):
- if is_deepspeed_zero3_enabled():
- import deepspeed
- with deepspeed.zero.GatheredParameters(out, modifier_rank=0):
- if torch.distributed.get_rank() == 0:
- return _create_sinusoidal_embeddings(n_pos=n_pos, dim=dim, out=out)
- else:
- return _create_sinusoidal_embeddings(n_pos=n_pos, dim=dim, out=out)
- def _create_sinusoidal_embeddings(n_pos: int, dim: int, out: torch.Tensor):
- position_enc = np.array([[pos / np.power(10000, 2 * (j // 2) / dim) for j in range(dim)] for pos in range(n_pos)])
- out.requires_grad = False
- out[:, 0::2] = torch.FloatTensor(np.sin(position_enc[:, 0::2]))
- out[:, 1::2] = torch.FloatTensor(np.cos(position_enc[:, 1::2]))
- out.detach_()
- return out
- class Embeddings(nn.Module):
- def __init__(self, config: PreTrainedConfig):
- super().__init__()
- self.word_embeddings = nn.Embedding(config.vocab_size, config.dim, padding_idx=config.pad_token_id)
- self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.dim)
- self.LayerNorm = nn.LayerNorm(config.dim, eps=1e-12)
- self.dropout = nn.Dropout(config.dropout)
- self.register_buffer(
- "position_ids", torch.arange(config.max_position_embeddings).expand((1, -1)), persistent=False
- )
- @deprecate_kwarg("input_embeds", version="5.6.0", new_name="inputs_embeds")
- def forward(
- self,
- input_ids: torch.Tensor,
- inputs_embeds: torch.Tensor | None = None,
- position_ids: torch.LongTensor | None = None,
- ) -> torch.Tensor:
- if input_ids is not None:
- inputs_embeds = self.word_embeddings(input_ids) # (bs, max_seq_length, dim)
- seq_length = inputs_embeds.size(1)
- if position_ids is None:
- # Setting the position-ids to the registered buffer in constructor, it helps
- # when tracing the model without passing position-ids, solves
- # issues similar to issue #5664
- if hasattr(self, "position_ids"):
- position_ids = self.position_ids[:, :seq_length]
- else:
- position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device) # (max_seq_length)
- position_ids = position_ids.unsqueeze(0).expand_as(input_ids) # (bs, max_seq_length)
- position_embeddings = self.position_embeddings(position_ids) # (bs, max_seq_length, dim)
- embeddings = inputs_embeds + position_embeddings # (bs, max_seq_length, dim)
- embeddings = self.LayerNorm(embeddings) # (bs, max_seq_length, dim)
- embeddings = self.dropout(embeddings) # (bs, max_seq_length, dim)
- return embeddings
- # Copied from transformers.models.bert.modeling_bert.eager_attention_forward
- def eager_attention_forward(
- module: nn.Module,
- query: torch.Tensor,
- key: torch.Tensor,
- value: torch.Tensor,
- attention_mask: torch.Tensor | None,
- scaling: float | None = None,
- dropout: float = 0.0,
- **kwargs: Unpack[TransformersKwargs],
- ):
- if scaling is None:
- scaling = query.size(-1) ** -0.5
- # Take the dot product between "query" and "key" to get the raw attention scores.
- attn_weights = torch.matmul(query, key.transpose(2, 3)) * scaling
- if attention_mask is not None:
- attn_weights = attn_weights + attention_mask
- attn_weights = nn.functional.softmax(attn_weights, dim=-1)
- attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training)
- attn_output = torch.matmul(attn_weights, value)
- attn_output = attn_output.transpose(1, 2).contiguous()
- return attn_output, attn_weights
- class DistilBertSelfAttention(nn.Module):
- def __init__(self, config: PreTrainedConfig):
- super().__init__()
- self.config = config
- self.n_heads = config.n_heads
- self.dim = config.dim
- self.attention_head_size = self.dim // self.n_heads
- self.scaling = self.attention_head_size**-0.5
- # Have an even number of multi heads that divide the dimensions
- if self.dim % self.n_heads != 0:
- # Raise value errors for even multi-head attention nodes
- raise ValueError(f"self.n_heads: {self.n_heads} must divide self.dim: {self.dim} evenly")
- self.q_lin = nn.Linear(in_features=config.dim, out_features=config.dim)
- self.k_lin = nn.Linear(in_features=config.dim, out_features=config.dim)
- self.v_lin = nn.Linear(in_features=config.dim, out_features=config.dim)
- self.out_lin = nn.Linear(in_features=config.dim, out_features=config.dim)
- self.dropout = nn.Dropout(p=config.attention_dropout)
- self.is_causal = False
- def forward(
- self,
- hidden_states: torch.Tensor,
- attention_mask: torch.FloatTensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> tuple[torch.Tensor]:
- input_shape = hidden_states.shape[:-1]
- hidden_shape = (*input_shape, -1, self.attention_head_size)
- # get all proj
- query_layer = self.q_lin(hidden_states).view(*hidden_shape).transpose(1, 2)
- key_layer = self.k_lin(hidden_states).view(*hidden_shape).transpose(1, 2)
- value_layer = self.v_lin(hidden_states).view(*hidden_shape).transpose(1, 2)
- attention_interface: Callable = ALL_ATTENTION_FUNCTIONS.get_interface(
- self.config._attn_implementation, eager_attention_forward
- )
- attn_output, attn_weights = attention_interface(
- self,
- query_layer,
- key_layer,
- value_layer,
- attention_mask,
- dropout=0.0 if not self.training else self.dropout.p,
- scaling=self.scaling,
- **kwargs,
- )
- attn_output = attn_output.reshape(*input_shape, -1).contiguous()
- attn_output = self.out_lin(attn_output)
- return attn_output, attn_weights
- class FFN(nn.Module):
- def __init__(self, config: PreTrainedConfig):
- super().__init__()
- self.dropout = nn.Dropout(p=config.dropout)
- self.chunk_size_feed_forward = config.chunk_size_feed_forward
- self.seq_len_dim = 1
- self.lin1 = nn.Linear(in_features=config.dim, out_features=config.hidden_dim)
- self.lin2 = nn.Linear(in_features=config.hidden_dim, out_features=config.dim)
- self.activation = get_activation(config.activation)
- def forward(self, input: torch.Tensor) -> torch.Tensor:
- return apply_chunking_to_forward(self.ff_chunk, self.chunk_size_feed_forward, self.seq_len_dim, input)
- def ff_chunk(self, input: torch.Tensor) -> torch.Tensor:
- x = self.lin1(input)
- x = self.activation(x)
- x = self.lin2(x)
- x = self.dropout(x)
- return x
- class TransformerBlock(GradientCheckpointingLayer):
- def __init__(self, config: PreTrainedConfig):
- super().__init__()
- # Have an even number of Configure multi-heads
- if config.dim % config.n_heads != 0:
- raise ValueError(f"config.n_heads {config.n_heads} must divide config.dim {config.dim} evenly")
- self.attention = DistilBertSelfAttention(config)
- self.sa_layer_norm = nn.LayerNorm(normalized_shape=config.dim, eps=1e-12)
- self.ffn = FFN(config)
- self.output_layer_norm = nn.LayerNorm(normalized_shape=config.dim, eps=1e-12)
- def forward(
- self,
- hidden_states: torch.Tensor,
- attention_mask: torch.Tensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> tuple[torch.Tensor, ...]:
- # Self-Attention
- attention_output, _ = self.attention(
- hidden_states,
- attention_mask=attention_mask,
- **kwargs,
- )
- attention_output = self.sa_layer_norm(attention_output + hidden_states)
- # Feed Forward Network
- ffn_output = self.ffn(attention_output)
- ffn_output = self.output_layer_norm(ffn_output + attention_output)
- return ffn_output
- class Transformer(nn.Module):
- def __init__(self, config: PreTrainedConfig):
- super().__init__()
- self.n_layers = config.n_layers
- self.layer = nn.ModuleList([TransformerBlock(config) for _ in range(config.n_layers)])
- self.gradient_checkpointing = False
- def forward(
- self,
- hidden_states: torch.Tensor,
- attention_mask: torch.Tensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> BaseModelOutput:
- for layer_module in self.layer:
- hidden_states = layer_module(
- hidden_states,
- attention_mask,
- **kwargs,
- )
- return BaseModelOutput(last_hidden_state=hidden_states)
- # INTERFACE FOR ENCODER AND TASK SPECIFIC MODEL #
- @auto_docstring
- class DistilBertPreTrainedModel(PreTrainedModel):
- config: DistilBertConfig
- base_model_prefix = "distilbert"
- supports_gradient_checkpointing = True
- _supports_flash_attn = True
- _supports_sdpa = True
- _supports_flex_attn = True
- _supports_attention_backend = True
- _can_record_outputs = {
- "hidden_states": TransformerBlock,
- "attentions": DistilBertSelfAttention,
- }
- @torch.no_grad()
- def _init_weights(self, module: nn.Module):
- """Initialize the weights."""
- super()._init_weights(module)
- if isinstance(module, Embeddings):
- if self.config.sinusoidal_pos_embds:
- init.copy_(
- module.position_embeddings.weight,
- create_sinusoidal_embeddings(
- self.config.max_position_embeddings,
- self.config.dim,
- torch.empty_like(module.position_embeddings.weight),
- ),
- )
- init.copy_(module.position_ids, torch.arange(module.position_ids.shape[-1]).expand((1, -1)))
- @auto_docstring
- class DistilBertModel(DistilBertPreTrainedModel):
- def __init__(self, config: PreTrainedConfig):
- super().__init__(config)
- self.embeddings = Embeddings(config) # Embeddings
- self.transformer = Transformer(config) # Encoder
- # Initialize weights and apply final processing
- self.post_init()
- def get_position_embeddings(self) -> nn.Embedding:
- """
- Returns the position embeddings
- """
- return self.embeddings.position_embeddings
- def resize_position_embeddings(self, new_num_position_embeddings: int):
- """
- Resizes position embeddings of the model if `new_num_position_embeddings != config.max_position_embeddings`.
- Arguments:
- new_num_position_embeddings (`int`):
- The number of new position embedding matrix. If position embeddings are learned, increasing the size
- will add newly initialized vectors at the end, whereas reducing the size will remove vectors from the
- end. If position embeddings are not learned (*e.g.* sinusoidal position embeddings), increasing the
- size will add correct vectors at the end following the position encoding algorithm, whereas reducing
- the size will remove vectors from the end.
- """
- num_position_embeds_diff = new_num_position_embeddings - self.config.max_position_embeddings
- # no resizing needs to be done if the length stays the same
- if num_position_embeds_diff == 0:
- return
- logger.info(f"Setting `config.max_position_embeddings={new_num_position_embeddings}`...")
- self.config.max_position_embeddings = new_num_position_embeddings
- old_position_embeddings_weight = self.embeddings.position_embeddings.weight.clone()
- self.embeddings.position_embeddings = nn.Embedding(self.config.max_position_embeddings, self.config.dim)
- if self.config.sinusoidal_pos_embds:
- create_sinusoidal_embeddings(
- n_pos=self.config.max_position_embeddings, dim=self.config.dim, out=self.position_embeddings.weight
- )
- else:
- with torch.no_grad():
- if num_position_embeds_diff > 0:
- self.embeddings.position_embeddings.weight[:-num_position_embeds_diff] = nn.Parameter(
- old_position_embeddings_weight
- )
- else:
- self.embeddings.position_embeddings.weight = nn.Parameter(
- old_position_embeddings_weight[:num_position_embeds_diff]
- )
- # move position_embeddings to correct device
- self.embeddings.position_embeddings.to(self.device)
- def get_input_embeddings(self) -> nn.Embedding:
- return self.embeddings.word_embeddings
- def set_input_embeddings(self, new_embeddings: nn.Embedding):
- self.embeddings.word_embeddings = new_embeddings
- @merge_with_config_defaults
- @capture_outputs
- @auto_docstring
- def forward(
- self,
- input_ids: torch.Tensor | None = None,
- attention_mask: torch.Tensor | None = None,
- inputs_embeds: torch.Tensor | None = None,
- position_ids: torch.Tensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> BaseModelOutput | tuple[torch.Tensor, ...]:
- r"""
- input_ids (`torch.LongTensor` of shape `(batch_size, num_choices)`):
- Indices of input sequence tokens in the vocabulary.
- Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
- [`PreTrainedTokenizer.__call__`] for details.
- [What are input IDs?](../glossary#input-ids)
- inputs_embeds (`torch.FloatTensor` of shape `(batch_size, num_choices, hidden_size)`, *optional*):
- Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
- is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
- model's internal embedding lookup matrix.
- """
- if (input_ids is None) ^ (inputs_embeds is not None):
- raise ValueError("You must specify exactly one of input_ids or inputs_embeds")
- embeddings = self.embeddings(input_ids, inputs_embeds, position_ids)
- attention_mask = create_bidirectional_mask(
- config=self.config,
- inputs_embeds=embeddings,
- attention_mask=attention_mask,
- )
- return self.transformer(
- hidden_states=embeddings,
- attention_mask=attention_mask,
- **kwargs,
- )
- @auto_docstring(
- custom_intro="""
- DistilBert Model with a `masked language modeling` head on top.
- """
- )
- class DistilBertForMaskedLM(DistilBertPreTrainedModel):
- _tied_weights_keys = {"vocab_projector.weight": "distilbert.embeddings.word_embeddings.weight"}
- def __init__(self, config: PreTrainedConfig):
- super().__init__(config)
- self.activation = get_activation(config.activation)
- self.distilbert = DistilBertModel(config)
- self.vocab_transform = nn.Linear(config.dim, config.dim)
- self.vocab_layer_norm = nn.LayerNorm(config.dim, eps=1e-12)
- self.vocab_projector = nn.Linear(config.dim, config.vocab_size)
- # Initialize weights and apply final processing
- self.post_init()
- self.mlm_loss_fct = nn.CrossEntropyLoss()
- def get_position_embeddings(self) -> nn.Embedding:
- """
- Returns the position embeddings
- """
- return self.distilbert.get_position_embeddings()
- def resize_position_embeddings(self, new_num_position_embeddings: int):
- """
- Resizes position embeddings of the model if `new_num_position_embeddings != config.max_position_embeddings`.
- Arguments:
- new_num_position_embeddings (`int`):
- The number of new position embedding matrix. If position embeddings are learned, increasing the size
- will add newly initialized vectors at the end, whereas reducing the size will remove vectors from the
- end. If position embeddings are not learned (*e.g.* sinusoidal position embeddings), increasing the
- size will add correct vectors at the end following the position encoding algorithm, whereas reducing
- the size will remove vectors from the end.
- """
- self.distilbert.resize_position_embeddings(new_num_position_embeddings)
- def get_output_embeddings(self) -> nn.Module:
- return self.vocab_projector
- def set_output_embeddings(self, new_embeddings: nn.Module):
- self.vocab_projector = new_embeddings
- @can_return_tuple
- @auto_docstring
- def forward(
- self,
- input_ids: torch.Tensor | None = None,
- attention_mask: torch.Tensor | None = None,
- inputs_embeds: torch.Tensor | None = None,
- labels: torch.LongTensor | None = None,
- position_ids: torch.Tensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> MaskedLMOutput | tuple[torch.Tensor, ...]:
- r"""
- input_ids (`torch.LongTensor` of shape `(batch_size, num_choices)`):
- Indices of input sequence tokens in the vocabulary.
- Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
- [`PreTrainedTokenizer.__call__`] for details.
- [What are input IDs?](../glossary#input-ids)
- inputs_embeds (`torch.FloatTensor` of shape `(batch_size, num_choices, hidden_size)`, *optional*):
- Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
- is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
- model's internal embedding lookup matrix.
- labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
- Labels for computing the masked language modeling loss. Indices should be in `[-100, 0, ...,
- config.vocab_size]` (see `input_ids` docstring) Tokens with indices set to `-100` are ignored (masked), the
- loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`.
- """
- dlbrt_output = self.distilbert(
- input_ids=input_ids,
- attention_mask=attention_mask,
- inputs_embeds=inputs_embeds,
- position_ids=position_ids,
- return_dict=True,
- **kwargs,
- )
- hidden_states = dlbrt_output[0] # (bs, seq_length, dim)
- prediction_logits = self.vocab_transform(hidden_states) # (bs, seq_length, dim)
- prediction_logits = self.activation(prediction_logits) # (bs, seq_length, dim)
- prediction_logits = self.vocab_layer_norm(prediction_logits) # (bs, seq_length, dim)
- prediction_logits = self.vocab_projector(prediction_logits) # (bs, seq_length, vocab_size)
- mlm_loss = None
- if labels is not None:
- mlm_loss = self.mlm_loss_fct(prediction_logits.view(-1, prediction_logits.size(-1)), labels.view(-1))
- return MaskedLMOutput(
- loss=mlm_loss,
- logits=prediction_logits,
- hidden_states=dlbrt_output.hidden_states,
- attentions=dlbrt_output.attentions,
- )
- @auto_docstring(
- custom_intro="""
- DistilBert Model transformer with a sequence classification/regression head on top (a linear layer on top of the
- pooled output) e.g. for GLUE tasks.
- """
- )
- class DistilBertForSequenceClassification(DistilBertPreTrainedModel):
- def __init__(self, config: PreTrainedConfig):
- super().__init__(config)
- self.num_labels = config.num_labels
- self.config = config
- self.distilbert = DistilBertModel(config)
- self.pre_classifier = nn.Linear(config.dim, config.dim)
- self.classifier = nn.Linear(config.dim, config.num_labels)
- self.dropout = nn.Dropout(config.seq_classif_dropout)
- # Initialize weights and apply final processing
- self.post_init()
- def get_position_embeddings(self) -> nn.Embedding:
- """
- Returns the position embeddings
- """
- return self.distilbert.get_position_embeddings()
- def resize_position_embeddings(self, new_num_position_embeddings: int):
- """
- Resizes position embeddings of the model if `new_num_position_embeddings != config.max_position_embeddings`.
- Arguments:
- new_num_position_embeddings (`int`):
- The number of new position embedding matrix. If position embeddings are learned, increasing the size
- will add newly initialized vectors at the end, whereas reducing the size will remove vectors from the
- end. If position embeddings are not learned (*e.g.* sinusoidal position embeddings), increasing the
- size will add correct vectors at the end following the position encoding algorithm, whereas reducing
- the size will remove vectors from the end.
- """
- self.distilbert.resize_position_embeddings(new_num_position_embeddings)
- @can_return_tuple
- @auto_docstring
- def forward(
- self,
- input_ids: torch.Tensor | None = None,
- attention_mask: torch.Tensor | None = None,
- inputs_embeds: torch.Tensor | None = None,
- labels: torch.LongTensor | None = None,
- position_ids: torch.Tensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> SequenceClassifierOutput | tuple[torch.Tensor, ...]:
- r"""
- labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
- Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
- config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
- `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
- """
- distilbert_output = self.distilbert(
- input_ids=input_ids,
- attention_mask=attention_mask,
- inputs_embeds=inputs_embeds,
- position_ids=position_ids,
- return_dict=True,
- **kwargs,
- )
- hidden_state = distilbert_output[0] # (bs, seq_len, dim)
- pooled_output = hidden_state[:, 0] # (bs, dim)
- pooled_output = self.pre_classifier(pooled_output) # (bs, dim)
- pooled_output = nn.ReLU()(pooled_output) # (bs, dim)
- pooled_output = self.dropout(pooled_output) # (bs, dim)
- logits = self.classifier(pooled_output) # (bs, num_labels)
- loss = None
- if labels is not None:
- if self.config.problem_type is None:
- if self.num_labels == 1:
- self.config.problem_type = "regression"
- elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
- self.config.problem_type = "single_label_classification"
- else:
- self.config.problem_type = "multi_label_classification"
- if self.config.problem_type == "regression":
- loss_fct = MSELoss()
- if self.num_labels == 1:
- loss = loss_fct(logits.squeeze(), labels.squeeze())
- else:
- loss = loss_fct(logits, labels)
- elif self.config.problem_type == "single_label_classification":
- loss_fct = CrossEntropyLoss()
- loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
- elif self.config.problem_type == "multi_label_classification":
- loss_fct = BCEWithLogitsLoss()
- loss = loss_fct(logits, labels)
- return SequenceClassifierOutput(
- loss=loss,
- logits=logits,
- hidden_states=distilbert_output.hidden_states,
- attentions=distilbert_output.attentions,
- )
- @auto_docstring
- class DistilBertForQuestionAnswering(DistilBertPreTrainedModel):
- def __init__(self, config: PreTrainedConfig):
- super().__init__(config)
- self.distilbert = DistilBertModel(config)
- self.qa_outputs = nn.Linear(config.dim, config.num_labels)
- if config.num_labels != 2:
- raise ValueError(f"config.num_labels should be 2, but it is {config.num_labels}")
- self.dropout = nn.Dropout(config.qa_dropout)
- # Initialize weights and apply final processing
- self.post_init()
- def get_position_embeddings(self) -> nn.Embedding:
- """
- Returns the position embeddings
- """
- return self.distilbert.get_position_embeddings()
- def resize_position_embeddings(self, new_num_position_embeddings: int):
- """
- Resizes position embeddings of the model if `new_num_position_embeddings != config.max_position_embeddings`.
- Arguments:
- new_num_position_embeddings (`int`):
- The number of new position embedding matrix. If position embeddings are learned, increasing the size
- will add newly initialized vectors at the end, whereas reducing the size will remove vectors from the
- end. If position embeddings are not learned (*e.g.* sinusoidal position embeddings), increasing the
- size will add correct vectors at the end following the position encoding algorithm, whereas reducing
- the size will remove vectors from the end.
- """
- self.distilbert.resize_position_embeddings(new_num_position_embeddings)
- @can_return_tuple
- @auto_docstring
- def forward(
- self,
- input_ids: torch.Tensor | None = None,
- attention_mask: torch.Tensor | None = None,
- inputs_embeds: torch.Tensor | None = None,
- start_positions: torch.Tensor | None = None,
- end_positions: torch.Tensor | None = None,
- position_ids: torch.Tensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> QuestionAnsweringModelOutput | tuple[torch.Tensor, ...]:
- r"""
- input_ids (`torch.LongTensor` of shape `(batch_size, num_choices)`):
- Indices of input sequence tokens in the vocabulary.
- Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
- [`PreTrainedTokenizer.__call__`] for details.
- [What are input IDs?](../glossary#input-ids)
- inputs_embeds (`torch.FloatTensor` of shape `(batch_size, num_choices, hidden_size)`, *optional*):
- Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
- is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
- model's internal embedding lookup matrix.
- """
- distilbert_output = self.distilbert(
- input_ids=input_ids,
- attention_mask=attention_mask,
- inputs_embeds=inputs_embeds,
- position_ids=position_ids,
- return_dict=True,
- **kwargs,
- )
- hidden_states = distilbert_output[0] # (bs, max_query_len, dim)
- hidden_states = self.dropout(hidden_states) # (bs, max_query_len, dim)
- logits = self.qa_outputs(hidden_states) # (bs, max_query_len, 2)
- start_logits, end_logits = logits.split(1, dim=-1)
- start_logits = start_logits.squeeze(-1).contiguous() # (bs, max_query_len)
- end_logits = end_logits.squeeze(-1).contiguous() # (bs, max_query_len)
- total_loss = None
- if start_positions is not None and end_positions is not None:
- # If we are on multi-GPU, split add a dimension
- if len(start_positions.size()) > 1:
- start_positions = start_positions.squeeze(-1)
- if len(end_positions.size()) > 1:
- end_positions = end_positions.squeeze(-1)
- # sometimes the start/end positions are outside our model inputs, we ignore these terms
- ignored_index = start_logits.size(1)
- start_positions = start_positions.clamp(0, ignored_index)
- end_positions = end_positions.clamp(0, ignored_index)
- loss_fct = nn.CrossEntropyLoss(ignore_index=ignored_index)
- start_loss = loss_fct(start_logits, start_positions)
- end_loss = loss_fct(end_logits, end_positions)
- total_loss = (start_loss + end_loss) / 2
- return QuestionAnsweringModelOutput(
- loss=total_loss,
- start_logits=start_logits,
- end_logits=end_logits,
- hidden_states=distilbert_output.hidden_states,
- attentions=distilbert_output.attentions,
- )
- @auto_docstring
- class DistilBertForTokenClassification(DistilBertPreTrainedModel):
- def __init__(self, config: PreTrainedConfig):
- super().__init__(config)
- self.num_labels = config.num_labels
- self.distilbert = DistilBertModel(config)
- self.dropout = nn.Dropout(config.dropout)
- self.classifier = nn.Linear(config.hidden_size, config.num_labels)
- # Initialize weights and apply final processing
- self.post_init()
- def get_position_embeddings(self) -> nn.Embedding:
- """
- Returns the position embeddings
- """
- return self.distilbert.get_position_embeddings()
- def resize_position_embeddings(self, new_num_position_embeddings: int):
- """
- Resizes position embeddings of the model if `new_num_position_embeddings != config.max_position_embeddings`.
- Arguments:
- new_num_position_embeddings (`int`):
- The number of new position embedding matrix. If position embeddings are learned, increasing the size
- will add newly initialized vectors at the end, whereas reducing the size will remove vectors from the
- end. If position embeddings are not learned (*e.g.* sinusoidal position embeddings), increasing the
- size will add correct vectors at the end following the position encoding algorithm, whereas reducing
- the size will remove vectors from the end.
- """
- self.distilbert.resize_position_embeddings(new_num_position_embeddings)
- @can_return_tuple
- @auto_docstring
- def forward(
- self,
- input_ids: torch.Tensor | None = None,
- attention_mask: torch.Tensor | None = None,
- inputs_embeds: torch.Tensor | None = None,
- labels: torch.LongTensor | None = None,
- position_ids: torch.Tensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> TokenClassifierOutput | tuple[torch.Tensor, ...]:
- r"""
- labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
- Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`.
- """
- outputs = self.distilbert(
- input_ids,
- attention_mask=attention_mask,
- inputs_embeds=inputs_embeds,
- position_ids=position_ids,
- return_dict=True,
- **kwargs,
- )
- sequence_output = outputs[0]
- sequence_output = self.dropout(sequence_output)
- logits = self.classifier(sequence_output)
- loss = None
- if labels is not None:
- loss_fct = CrossEntropyLoss()
- loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
- return TokenClassifierOutput(
- loss=loss,
- logits=logits,
- hidden_states=outputs.hidden_states,
- attentions=outputs.attentions,
- )
- @auto_docstring
- class DistilBertForMultipleChoice(DistilBertPreTrainedModel):
- def __init__(self, config: PreTrainedConfig):
- super().__init__(config)
- self.distilbert = DistilBertModel(config)
- self.pre_classifier = nn.Linear(config.dim, config.dim)
- self.classifier = nn.Linear(config.dim, 1)
- self.dropout = nn.Dropout(config.seq_classif_dropout)
- # Initialize weights and apply final processing
- self.post_init()
- def get_position_embeddings(self) -> nn.Embedding:
- """
- Returns the position embeddings
- """
- return self.distilbert.get_position_embeddings()
- def resize_position_embeddings(self, new_num_position_embeddings: int):
- """
- Resizes position embeddings of the model if `new_num_position_embeddings != config.max_position_embeddings`.
- Arguments:
- new_num_position_embeddings (`int`)
- The number of new position embeddings. If position embeddings are learned, increasing the size will add
- newly initialized vectors at the end, whereas reducing the size will remove vectors from the end. If
- position embeddings are not learned (*e.g.* sinusoidal position embeddings), increasing the size will
- add correct vectors at the end following the position encoding algorithm, whereas reducing the size
- will remove vectors from the end.
- """
- self.distilbert.resize_position_embeddings(new_num_position_embeddings)
- @can_return_tuple
- @auto_docstring
- def forward(
- self,
- input_ids: torch.Tensor | None = None,
- attention_mask: torch.Tensor | None = None,
- inputs_embeds: torch.Tensor | None = None,
- labels: torch.LongTensor | None = None,
- position_ids: torch.Tensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> MultipleChoiceModelOutput | tuple[torch.Tensor, ...]:
- r"""
- input_ids (`torch.LongTensor` of shape `(batch_size, num_choices, sequence_length)`):
- Indices of input sequence tokens in the vocabulary.
- Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
- [`PreTrainedTokenizer.__call__`] for details.
- [What are input IDs?](../glossary#input-ids)
- inputs_embeds (`torch.FloatTensor` of shape `(batch_size, num_choices, sequence_length, hidden_size)`, *optional*):
- Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
- is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
- model's internal embedding lookup matrix.
- labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
- Labels for computing the multiple choice classification loss. Indices should be in `[0, ...,
- num_choices-1]` where `num_choices` is the size of the second dimension of the input tensors. (See
- `input_ids` above)
- Examples:
- ```python
- >>> from transformers import AutoTokenizer, DistilBertForMultipleChoice
- >>> import torch
- >>> tokenizer = AutoTokenizer.from_pretrained("distilbert-base-cased")
- >>> model = DistilBertForMultipleChoice.from_pretrained("distilbert-base-cased")
- >>> prompt = "In Italy, pizza served in formal settings, such as at a restaurant, is presented unsliced."
- >>> choice0 = "It is eaten with a fork and a knife."
- >>> choice1 = "It is eaten while held in the hand."
- >>> labels = torch.tensor(0).unsqueeze(0) # choice0 is correct (according to Wikipedia ;)), batch size 1
- >>> encoding = tokenizer([[prompt, choice0], [prompt, choice1]], return_tensors="pt", padding=True)
- >>> outputs = model(**{k: v.unsqueeze(0) for k, v in encoding.items()}, labels=labels) # batch size is 1
- >>> # the linear classifier still needs to be trained
- >>> loss = outputs.loss
- >>> logits = outputs.logits
- ```"""
- num_choices = input_ids.shape[1] if input_ids is not None else inputs_embeds.shape[1]
- input_ids = input_ids.view(-1, input_ids.size(-1)) if input_ids is not None else None
- attention_mask = attention_mask.view(-1, attention_mask.size(-1)) if attention_mask is not None else None
- inputs_embeds = (
- inputs_embeds.view(-1, inputs_embeds.size(-2), inputs_embeds.size(-1))
- if inputs_embeds is not None
- else None
- )
- outputs = self.distilbert(
- input_ids,
- attention_mask=attention_mask,
- inputs_embeds=inputs_embeds,
- position_ids=position_ids,
- return_dict=True,
- **kwargs,
- )
- hidden_state = outputs[0] # (bs * num_choices, seq_len, dim)
- pooled_output = hidden_state[:, 0] # (bs * num_choices, dim)
- pooled_output = self.pre_classifier(pooled_output) # (bs * num_choices, dim)
- pooled_output = nn.ReLU()(pooled_output) # (bs * num_choices, dim)
- pooled_output = self.dropout(pooled_output) # (bs * num_choices, dim)
- logits = self.classifier(pooled_output) # (bs * num_choices, 1)
- reshaped_logits = logits.view(-1, num_choices) # (bs, num_choices)
- loss = None
- if labels is not None:
- loss_fct = CrossEntropyLoss()
- loss = loss_fct(reshaped_logits, labels)
- return MultipleChoiceModelOutput(
- loss=loss,
- logits=reshaped_logits,
- hidden_states=outputs.hidden_states,
- attentions=outputs.attentions,
- )
- __all__ = [
- "DistilBertForMaskedLM",
- "DistilBertForMultipleChoice",
- "DistilBertForQuestionAnswering",
- "DistilBertForSequenceClassification",
- "DistilBertForTokenClassification",
- "DistilBertModel",
- "DistilBertPreTrainedModel",
- ]
|