| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185 |
- # Copyright 2022 Intel Labs, OpenMMLab and The HuggingFace Inc. team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """PyTorch DPT (Dense Prediction Transformers) model.
- This implementation is heavily inspired by OpenMMLab's implementation, found here:
- https://github.com/open-mmlab/mmsegmentation/blob/master/mmseg/models/decode_heads/dpt_head.py.
- """
- import collections.abc
- from collections.abc import Callable
- from dataclasses import dataclass
- import torch
- from torch import nn
- from torch.nn import CrossEntropyLoss
- from ... import initialization as init
- from ...activations import ACT2FN
- from ...backbone_utils import load_backbone
- from ...modeling_layers import GradientCheckpointingLayer
- from ...modeling_outputs import BaseModelOutput, DepthEstimatorOutput, SemanticSegmenterOutput
- from ...modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel
- from ...processing_utils import Unpack
- from ...utils import ModelOutput, TransformersKwargs, auto_docstring, logging, torch_int
- from ...utils.generic import can_return_tuple, merge_with_config_defaults
- from ...utils.output_capturing import capture_outputs
- from .configuration_dpt import DPTConfig
- logger = logging.get_logger(__name__)
- @dataclass
- @auto_docstring(
- custom_intro="""
- Base class for model's outputs that also contains intermediate activations that can be used at later stages. Useful
- in the context of Vision models.:
- """
- )
- class BaseModelOutputWithIntermediateActivations(ModelOutput):
- r"""
- last_hidden_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`):
- Sequence of hidden-states at the output of the last layer of the model.
- intermediate_activations (`tuple(torch.FloatTensor)`, *optional*):
- Intermediate activations that can be used to compute hidden states of the model at various layers.
- """
- last_hidden_states: torch.FloatTensor | None = None
- intermediate_activations: tuple[torch.FloatTensor, ...] | None = None
- @dataclass
- @auto_docstring(
- custom_intro="""
- Base class for model's outputs that also contains a pooling of the last hidden states as well as intermediate
- activations that can be used by the model at later stages.
- """
- )
- class BaseModelOutputWithPoolingAndIntermediateActivations(ModelOutput):
- r"""
- pooler_output (`torch.FloatTensor` of shape `(batch_size, hidden_size)`):
- Last layer hidden-state of the first token of the sequence (classification token) after further processing
- through the layers used for the auxiliary pretraining task. E.g. for BERT-family of models, this returns
- the classification token after processing through a linear layer and a tanh activation function. The linear
- layer weights are trained from the next sentence prediction (classification) objective during pretraining.
- intermediate_activations (`tuple(torch.FloatTensor)`, *optional*):
- Intermediate activations that can be used to compute hidden states of the model at various layers.
- """
- last_hidden_state: torch.FloatTensor | None = None
- pooler_output: torch.FloatTensor | None = None
- hidden_states: tuple[torch.FloatTensor, ...] | None = None
- attentions: tuple[torch.FloatTensor, ...] | None = None
- intermediate_activations: tuple[torch.FloatTensor, ...] | None = None
- class DPTViTHybridEmbeddings(nn.Module):
- """
- This class turns `pixel_values` of shape `(batch_size, num_channels, height, width)` into the initial
- `hidden_states` (patch embeddings) of shape `(batch_size, seq_length, hidden_size)` to be consumed by a
- Transformer.
- """
- def __init__(self, config: DPTConfig, feature_size: tuple[int, int] | None = None):
- super().__init__()
- image_size, patch_size = config.image_size, config.patch_size
- num_channels, hidden_size = config.num_channels, config.hidden_size
- image_size = image_size if isinstance(image_size, collections.abc.Iterable) else (image_size, image_size)
- patch_size = patch_size if isinstance(patch_size, collections.abc.Iterable) else (patch_size, patch_size)
- num_patches = (image_size[1] // patch_size[1]) * (image_size[0] // patch_size[0])
- self.backbone = load_backbone(config)
- feature_dim = self.backbone.channels[-1]
- if len(self.backbone.channels) != 3:
- raise ValueError(f"Expected backbone to have 3 output features, got {len(self.backbone.channels)}")
- self.residual_feature_map_index = [0, 1] # Always take the output of the first and second backbone stage
- if feature_size is None:
- feat_map_shape = config.backbone_featmap_shape
- feature_size = feat_map_shape[-2:]
- feature_dim = feat_map_shape[1]
- else:
- feature_size = (
- feature_size if isinstance(feature_size, collections.abc.Iterable) else (feature_size, feature_size)
- )
- feature_dim = self.backbone.channels[-1]
- self.image_size = image_size
- self.patch_size = patch_size[0]
- self.num_channels = num_channels
- self.projection = nn.Conv2d(feature_dim, hidden_size, kernel_size=1)
- self.cls_token = nn.Parameter(torch.zeros(1, 1, config.hidden_size))
- self.position_embeddings = nn.Parameter(torch.zeros(1, num_patches + 1, config.hidden_size))
- def _resize_pos_embed(self, posemb, grid_size_height, grid_size_width, start_index=1):
- posemb_tok = posemb[:, :start_index]
- posemb_grid = posemb[0, start_index:]
- old_grid_size = torch_int(len(posemb_grid) ** 0.5)
- posemb_grid = posemb_grid.reshape(1, old_grid_size, old_grid_size, -1).permute(0, 3, 1, 2)
- posemb_grid = nn.functional.interpolate(posemb_grid, size=(grid_size_height, grid_size_width), mode="bilinear")
- posemb_grid = posemb_grid.permute(0, 2, 3, 1).reshape(1, grid_size_height * grid_size_width, -1)
- posemb = torch.cat([posemb_tok, posemb_grid], dim=1)
- return posemb
- def forward(
- self, pixel_values: torch.Tensor, interpolate_pos_encoding: bool = False
- ) -> BaseModelOutputWithIntermediateActivations:
- batch_size, num_channels, height, width = pixel_values.shape
- if num_channels != self.num_channels:
- raise ValueError(
- "Make sure that the channel dimension of the pixel values match with the one set in the configuration."
- )
- if not interpolate_pos_encoding:
- if height != self.image_size[0] or width != self.image_size[1]:
- raise ValueError(
- f"Input image size ({height}*{width}) doesn't match model"
- f" ({self.image_size[0]}*{self.image_size[1]})."
- )
- position_embeddings = self._resize_pos_embed(
- self.position_embeddings, height // self.patch_size, width // self.patch_size
- )
- backbone_output = self.backbone(pixel_values)
- features = backbone_output.feature_maps[-1]
- # Retrieve also the intermediate activations to use them at later stages
- output_hidden_states = [backbone_output.feature_maps[index] for index in self.residual_feature_map_index]
- embeddings = self.projection(features).flatten(2).transpose(1, 2)
- cls_tokens = self.cls_token.expand(batch_size, -1, -1)
- embeddings = torch.cat((cls_tokens, embeddings), dim=1)
- # add positional encoding to each token
- embeddings = embeddings + position_embeddings
- # Return hidden states and intermediate activations
- return BaseModelOutputWithIntermediateActivations(
- last_hidden_states=embeddings,
- intermediate_activations=output_hidden_states,
- )
- class DPTViTEmbeddings(nn.Module):
- """
- Construct the CLS token, position and patch embeddings.
- """
- def __init__(self, config):
- super().__init__()
- self.cls_token = nn.Parameter(torch.zeros(1, 1, config.hidden_size))
- self.patch_embeddings = DPTViTPatchEmbeddings(config)
- num_patches = self.patch_embeddings.num_patches
- self.position_embeddings = nn.Parameter(torch.zeros(1, num_patches + 1, config.hidden_size))
- self.dropout = nn.Dropout(config.hidden_dropout_prob)
- self.config = config
- def _resize_pos_embed(self, posemb, grid_size_height, grid_size_width, start_index=1):
- posemb_tok = posemb[:, :start_index]
- posemb_grid = posemb[0, start_index:]
- old_grid_size = torch_int(posemb_grid.size(0) ** 0.5)
- posemb_grid = posemb_grid.reshape(1, old_grid_size, old_grid_size, -1).permute(0, 3, 1, 2)
- posemb_grid = nn.functional.interpolate(posemb_grid, size=(grid_size_height, grid_size_width), mode="bilinear")
- posemb_grid = posemb_grid.permute(0, 2, 3, 1).reshape(1, grid_size_height * grid_size_width, -1)
- posemb = torch.cat([posemb_tok, posemb_grid], dim=1)
- return posemb
- def forward(self, pixel_values: torch.Tensor) -> BaseModelOutputWithIntermediateActivations:
- batch_size, num_channels, height, width = pixel_values.shape
- # possibly interpolate position encodings to handle varying image sizes
- patch_size = self.config.patch_size
- position_embeddings = self._resize_pos_embed(
- self.position_embeddings, height // patch_size, width // patch_size
- )
- embeddings = self.patch_embeddings(pixel_values)
- batch_size, seq_len, _ = embeddings.size()
- # add the [CLS] token to the embedded patch tokens
- cls_tokens = self.cls_token.expand(batch_size, -1, -1)
- embeddings = torch.cat((cls_tokens, embeddings), dim=1)
- # add positional encoding to each token
- embeddings = embeddings + position_embeddings
- embeddings = self.dropout(embeddings)
- return BaseModelOutputWithIntermediateActivations(last_hidden_states=embeddings)
- class DPTViTPatchEmbeddings(nn.Module):
- """
- Image to Patch Embedding.
- """
- def __init__(self, config: DPTConfig):
- super().__init__()
- image_size, patch_size = config.image_size, config.patch_size
- num_channels, hidden_size = config.num_channels, config.hidden_size
- image_size = image_size if isinstance(image_size, collections.abc.Iterable) else (image_size, image_size)
- patch_size = patch_size if isinstance(patch_size, collections.abc.Iterable) else (patch_size, patch_size)
- num_patches = (image_size[1] // patch_size[1]) * (image_size[0] // patch_size[0])
- self.image_size = image_size
- self.patch_size = patch_size
- self.num_channels = num_channels
- self.num_patches = num_patches
- self.projection = nn.Conv2d(num_channels, hidden_size, kernel_size=patch_size, stride=patch_size)
- def forward(self, pixel_values: torch.Tensor) -> torch.Tensor:
- batch_size, num_channels, height, width = pixel_values.shape
- if num_channels != self.num_channels:
- raise ValueError(
- "Make sure that the channel dimension of the pixel values match with the one set in the configuration."
- )
- embeddings = self.projection(pixel_values).flatten(2).transpose(1, 2)
- return embeddings
- # Copied from transformers.models.bert.modeling_bert.eager_attention_forward
- def eager_attention_forward(
- module: nn.Module,
- query: torch.Tensor,
- key: torch.Tensor,
- value: torch.Tensor,
- attention_mask: torch.Tensor | None,
- scaling: float | None = None,
- dropout: float = 0.0,
- **kwargs: Unpack[TransformersKwargs],
- ):
- if scaling is None:
- scaling = query.size(-1) ** -0.5
- # Take the dot product between "query" and "key" to get the raw attention scores.
- attn_weights = torch.matmul(query, key.transpose(2, 3)) * scaling
- if attention_mask is not None:
- attn_weights = attn_weights + attention_mask
- attn_weights = nn.functional.softmax(attn_weights, dim=-1)
- attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training)
- attn_output = torch.matmul(attn_weights, value)
- attn_output = attn_output.transpose(1, 2).contiguous()
- return attn_output, attn_weights
- # Copied from transformers.models.vit.modeling_vit.ViTSelfAttention with ViT->DPT
- class DPTSelfAttention(nn.Module):
- def __init__(self, config: DPTConfig):
- super().__init__()
- if config.hidden_size % config.num_attention_heads != 0 and not hasattr(config, "embedding_size"):
- raise ValueError(
- f"The hidden size {config.hidden_size} is not a multiple of the number of attention "
- f"heads {config.num_attention_heads}."
- )
- self.config = config
- self.num_attention_heads = config.num_attention_heads
- self.attention_head_size = int(config.hidden_size / config.num_attention_heads)
- self.all_head_size = self.num_attention_heads * self.attention_head_size
- self.dropout_prob = config.attention_probs_dropout_prob
- self.scaling = self.attention_head_size**-0.5
- self.is_causal = False
- self.query = nn.Linear(config.hidden_size, self.all_head_size, bias=config.qkv_bias)
- self.key = nn.Linear(config.hidden_size, self.all_head_size, bias=config.qkv_bias)
- self.value = nn.Linear(config.hidden_size, self.all_head_size, bias=config.qkv_bias)
- def forward(
- self,
- hidden_states: torch.Tensor,
- **kwargs: Unpack[TransformersKwargs],
- ) -> tuple[torch.Tensor, torch.Tensor]:
- batch_size = hidden_states.shape[0]
- new_shape = batch_size, -1, self.num_attention_heads, self.attention_head_size
- key_layer = self.key(hidden_states).view(*new_shape).transpose(1, 2)
- value_layer = self.value(hidden_states).view(*new_shape).transpose(1, 2)
- query_layer = self.query(hidden_states).view(*new_shape).transpose(1, 2)
- attention_interface: Callable = ALL_ATTENTION_FUNCTIONS.get_interface(
- self.config._attn_implementation, eager_attention_forward
- )
- context_layer, attention_probs = attention_interface(
- self,
- query_layer,
- key_layer,
- value_layer,
- None,
- is_causal=self.is_causal,
- scaling=self.scaling,
- dropout=0.0 if not self.training else self.dropout_prob,
- **kwargs,
- )
- new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,)
- context_layer = context_layer.reshape(new_context_layer_shape)
- return context_layer, attention_probs
- # Copied from transformers.models.vit.modeling_vit.ViTSelfOutput with ViTConfig->DPTConfig, ViTSelfOutput->DPTViTSelfOutput
- class DPTViTSelfOutput(nn.Module):
- """
- The residual connection is defined in ViTLayer instead of here (as is the case with other models), due to the
- layernorm applied before each block.
- """
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.dense = nn.Linear(config.hidden_size, config.hidden_size)
- self.dropout = nn.Dropout(config.hidden_dropout_prob)
- def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor) -> torch.Tensor:
- hidden_states = self.dense(hidden_states)
- hidden_states = self.dropout(hidden_states)
- return hidden_states
- # Copied from transformers.models.vit.modeling_vit.ViTAttention with ViTConfig->DPTConfig, ViTSelfAttention->DPTSelfAttention, ViTSelfOutput->DPTViTSelfOutput
- class DPTViTAttention(nn.Module):
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.attention = DPTSelfAttention(config)
- self.output = DPTViTSelfOutput(config)
- def forward(
- self,
- hidden_states: torch.Tensor,
- **kwargs: Unpack[TransformersKwargs],
- ) -> torch.Tensor:
- self_attn_output, _ = self.attention(hidden_states, **kwargs)
- output = self.output(self_attn_output, hidden_states)
- return output
- # Copied from transformers.models.vit.modeling_vit.ViTIntermediate with ViTConfig->DPTConfig, ViTIntermediate->DPTViTIntermediate
- class DPTViTIntermediate(nn.Module):
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.dense = nn.Linear(config.hidden_size, config.intermediate_size)
- if isinstance(config.hidden_act, str):
- self.intermediate_act_fn = ACT2FN[config.hidden_act]
- else:
- self.intermediate_act_fn = config.hidden_act
- def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
- hidden_states = self.dense(hidden_states)
- hidden_states = self.intermediate_act_fn(hidden_states)
- return hidden_states
- # Copied from transformers.models.vit.modeling_vit.ViTOutput with ViTConfig->DPTConfig, ViTOutput->DPTViTOutput
- class DPTViTOutput(nn.Module):
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.dense = nn.Linear(config.intermediate_size, config.hidden_size)
- self.dropout = nn.Dropout(config.hidden_dropout_prob)
- def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor) -> torch.Tensor:
- hidden_states = self.dense(hidden_states)
- hidden_states = self.dropout(hidden_states)
- hidden_states = hidden_states + input_tensor
- return hidden_states
- # Copied from transformers.models.vit.modeling_vit.ViTLayer with ViTConfig->DPTConfig, ViTAttention->DPTViTAttention, ViTIntermediate->DPTViTIntermediate, ViTOutput->DPTViTOutput, ViTLayer->DPTViTLayer
- class DPTViTLayer(GradientCheckpointingLayer):
- """This corresponds to the Block class in the timm implementation."""
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.chunk_size_feed_forward = config.chunk_size_feed_forward
- self.seq_len_dim = 1
- self.attention = DPTViTAttention(config)
- self.intermediate = DPTViTIntermediate(config)
- self.output = DPTViTOutput(config)
- self.layernorm_before = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- self.layernorm_after = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- def forward(
- self,
- hidden_states: torch.Tensor,
- **kwargs: Unpack[TransformersKwargs],
- ) -> torch.Tensor:
- hidden_states_norm = self.layernorm_before(hidden_states)
- attention_output = self.attention(hidden_states_norm, **kwargs)
- # first residual connection
- hidden_states = attention_output + hidden_states
- # in ViT, layernorm is also applied after self-attention
- layer_output = self.layernorm_after(hidden_states)
- layer_output = self.intermediate(layer_output)
- # second residual connection is done here
- layer_output = self.output(layer_output, hidden_states)
- return layer_output
- class DPTReassembleStage(nn.Module):
- """
- This class reassembles the hidden states of the backbone into image-like feature representations at various
- resolutions.
- This happens in 3 stages:
- 1. Map the N + 1 tokens to a set of N tokens, by taking into account the readout ([CLS]) token according to
- `config.readout_type`.
- 2. Project the channel dimension of the hidden states according to `config.neck_hidden_sizes`.
- 3. Resizing the spatial dimensions (height, width).
- Args:
- config (`[DPTConfig]`):
- Model configuration class defining the model architecture.
- """
- def __init__(self, config):
- super().__init__()
- self.config = config
- self.layers = nn.ModuleList()
- if config.is_hybrid:
- self._init_reassemble_dpt_hybrid(config)
- else:
- self._init_reassemble_dpt(config)
- self.neck_ignore_stages = config.neck_ignore_stages
- def _init_reassemble_dpt_hybrid(self, config):
- r""" "
- For DPT-Hybrid the first 2 reassemble layers are set to `nn.Identity()`, please check the official
- implementation: https://github.com/isl-org/DPT/blob/f43ef9e08d70a752195028a51be5e1aff227b913/dpt/vit.py#L438
- for more details.
- """
- for i, factor in zip(range(len(config.neck_hidden_sizes)), config.reassemble_factors):
- if i <= 1:
- self.layers.append(nn.Identity())
- elif i > 1:
- self.layers.append(DPTReassembleLayer(config, channels=config.neck_hidden_sizes[i], factor=factor))
- if config.readout_type != "project":
- raise ValueError(f"Readout type {config.readout_type} is not supported for DPT-Hybrid.")
- # When using DPT-Hybrid the readout type is set to "project". The sanity check is done on the config file
- self.readout_projects = nn.ModuleList()
- hidden_size = _get_backbone_hidden_size(config)
- for i in range(len(config.neck_hidden_sizes)):
- if i <= 1:
- self.readout_projects.append(nn.Sequential(nn.Identity()))
- elif i > 1:
- self.readout_projects.append(
- nn.Sequential(nn.Linear(2 * hidden_size, hidden_size), ACT2FN[config.hidden_act])
- )
- def _init_reassemble_dpt(self, config):
- for i, factor in zip(range(len(config.neck_hidden_sizes)), config.reassemble_factors):
- self.layers.append(DPTReassembleLayer(config, channels=config.neck_hidden_sizes[i], factor=factor))
- if config.readout_type == "project":
- self.readout_projects = nn.ModuleList()
- hidden_size = _get_backbone_hidden_size(config)
- for _ in range(len(config.neck_hidden_sizes)):
- self.readout_projects.append(
- nn.Sequential(nn.Linear(2 * hidden_size, hidden_size), ACT2FN[config.hidden_act])
- )
- def forward(self, hidden_states: list[torch.Tensor], patch_height=None, patch_width=None) -> list[torch.Tensor]:
- """
- Args:
- hidden_states (`list[torch.FloatTensor]`, each of shape `(batch_size, sequence_length + 1, hidden_size)`):
- List of hidden states from the backbone.
- """
- out = []
- for i, hidden_state in enumerate(hidden_states):
- if i not in self.neck_ignore_stages:
- # reshape to (batch_size, num_channels, height, width)
- cls_token, hidden_state = hidden_state[:, 0], hidden_state[:, 1:]
- batch_size, sequence_length, num_channels = hidden_state.shape
- if patch_height is not None and patch_width is not None:
- hidden_state = hidden_state.reshape(batch_size, patch_height, patch_width, num_channels)
- else:
- size = torch_int(sequence_length**0.5)
- hidden_state = hidden_state.reshape(batch_size, size, size, num_channels)
- hidden_state = hidden_state.permute(0, 3, 1, 2).contiguous()
- feature_shape = hidden_state.shape
- if self.config.readout_type == "project":
- # reshape to (batch_size, height*width, num_channels)
- hidden_state = hidden_state.flatten(2).permute((0, 2, 1))
- readout = cls_token.unsqueeze(1).expand_as(hidden_state)
- # concatenate the readout token to the hidden states and project
- hidden_state = self.readout_projects[i](torch.cat((hidden_state, readout), -1))
- # reshape back to (batch_size, num_channels, height, width)
- hidden_state = hidden_state.permute(0, 2, 1).reshape(feature_shape)
- elif self.config.readout_type == "add":
- hidden_state = hidden_state.flatten(2) + cls_token.unsqueeze(-1)
- hidden_state = hidden_state.reshape(feature_shape)
- hidden_state = self.layers[i](hidden_state)
- out.append(hidden_state)
- return out
- def _get_backbone_hidden_size(config):
- if config.backbone_config is not None and hasattr(config.backbone_config, "hidden_size"):
- return config.backbone_config.hidden_size
- else:
- return config.hidden_size
- class DPTReassembleLayer(nn.Module):
- def __init__(self, config: DPTConfig, channels: int, factor: int):
- super().__init__()
- # projection
- hidden_size = _get_backbone_hidden_size(config)
- self.projection = nn.Conv2d(in_channels=hidden_size, out_channels=channels, kernel_size=1)
- # up/down sampling depending on factor
- if factor > 1:
- self.resize = nn.ConvTranspose2d(channels, channels, kernel_size=factor, stride=factor, padding=0)
- elif factor == 1:
- self.resize = nn.Identity()
- elif factor < 1:
- # so should downsample
- self.resize = nn.Conv2d(channels, channels, kernel_size=3, stride=int(1 / factor), padding=1)
- def forward(self, hidden_state):
- hidden_state = self.projection(hidden_state)
- hidden_state = self.resize(hidden_state)
- return hidden_state
- class DPTFeatureFusionStage(nn.Module):
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.layers = nn.ModuleList()
- for _ in range(len(config.neck_hidden_sizes)):
- self.layers.append(DPTFeatureFusionLayer(config))
- def forward(self, hidden_states):
- # reversing the hidden_states, we start from the last
- hidden_states = hidden_states[::-1]
- fused_hidden_states = []
- fused_hidden_state = None
- for hidden_state, layer in zip(hidden_states, self.layers):
- if fused_hidden_state is None:
- # first layer only uses the last hidden_state
- fused_hidden_state = layer(hidden_state)
- else:
- fused_hidden_state = layer(fused_hidden_state, hidden_state)
- fused_hidden_states.append(fused_hidden_state)
- return fused_hidden_states
- class DPTPreActResidualLayer(nn.Module):
- """
- ResidualConvUnit, pre-activate residual unit.
- Args:
- config (`[DPTConfig]`):
- Model configuration class defining the model architecture.
- """
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.use_batch_norm = config.use_batch_norm_in_fusion_residual
- use_bias_in_fusion_residual = (
- config.use_bias_in_fusion_residual
- if config.use_bias_in_fusion_residual is not None
- else not self.use_batch_norm
- )
- self.activation1 = nn.ReLU()
- self.convolution1 = nn.Conv2d(
- config.fusion_hidden_size,
- config.fusion_hidden_size,
- kernel_size=3,
- stride=1,
- padding=1,
- bias=use_bias_in_fusion_residual,
- )
- self.activation2 = nn.ReLU()
- self.convolution2 = nn.Conv2d(
- config.fusion_hidden_size,
- config.fusion_hidden_size,
- kernel_size=3,
- stride=1,
- padding=1,
- bias=use_bias_in_fusion_residual,
- )
- if self.use_batch_norm:
- self.batch_norm1 = nn.BatchNorm2d(config.fusion_hidden_size)
- self.batch_norm2 = nn.BatchNorm2d(config.fusion_hidden_size)
- def forward(self, hidden_state: torch.Tensor) -> torch.Tensor:
- residual = hidden_state
- hidden_state = self.activation1(hidden_state)
- hidden_state = self.convolution1(hidden_state)
- if self.use_batch_norm:
- hidden_state = self.batch_norm1(hidden_state)
- hidden_state = self.activation2(hidden_state)
- hidden_state = self.convolution2(hidden_state)
- if self.use_batch_norm:
- hidden_state = self.batch_norm2(hidden_state)
- return hidden_state + residual
- class DPTFeatureFusionLayer(nn.Module):
- """Feature fusion layer, merges feature maps from different stages.
- Args:
- config (`[DPTConfig]`):
- Model configuration class defining the model architecture.
- align_corners (`bool`, *optional*, defaults to `True`):
- The align_corner setting for bilinear upsample.
- """
- def __init__(self, config: DPTConfig, align_corners: bool = True):
- super().__init__()
- self.align_corners = align_corners
- self.projection = nn.Conv2d(config.fusion_hidden_size, config.fusion_hidden_size, kernel_size=1, bias=True)
- self.residual_layer1 = DPTPreActResidualLayer(config)
- self.residual_layer2 = DPTPreActResidualLayer(config)
- def forward(self, hidden_state: torch.Tensor, residual: torch.Tensor | None = None) -> torch.Tensor:
- if residual is not None:
- if hidden_state.shape != residual.shape:
- residual = nn.functional.interpolate(
- residual, size=(hidden_state.shape[2], hidden_state.shape[3]), mode="bilinear", align_corners=False
- )
- hidden_state = hidden_state + self.residual_layer1(residual)
- hidden_state = self.residual_layer2(hidden_state)
- hidden_state = nn.functional.interpolate(
- hidden_state, scale_factor=2, mode="bilinear", align_corners=self.align_corners
- )
- hidden_state = self.projection(hidden_state)
- return hidden_state
- @auto_docstring
- class DPTPreTrainedModel(PreTrainedModel):
- config: DPTConfig
- base_model_prefix = "dpt"
- main_input_name = "pixel_values"
- input_modalities = ("image",)
- supports_gradient_checkpointing = True
- _supports_sdpa = True
- _supports_flash_attn = True
- _supports_flex_attn = True
- _supports_attention_backend = True
- _can_record_outputs = {
- "hidden_states": DPTViTLayer,
- "attentions": DPTSelfAttention,
- }
- @torch.no_grad()
- def _init_weights(self, module):
- """Initialize the weights"""
- super()._init_weights(module)
- if isinstance(module, (DPTViTEmbeddings, DPTViTHybridEmbeddings)):
- init.zeros_(module.cls_token)
- init.zeros_(module.position_embeddings)
- class DPTViTEncoder(nn.Module):
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.config = config
- self.layer = nn.ModuleList([DPTViTLayer(config) for _ in range(config.num_hidden_layers)])
- def forward(
- self, hidden_states: torch.Tensor, output_hidden_states: bool = False, **kwargs: Unpack[TransformersKwargs]
- ) -> BaseModelOutput:
- for layer_module in self.layer:
- hidden_states = layer_module(hidden_states)
- return BaseModelOutput(last_hidden_state=hidden_states)
- @auto_docstring
- class DPTModel(DPTPreTrainedModel):
- def __init__(self, config: DPTConfig, add_pooling_layer: bool = True):
- r"""
- add_pooling_layer (bool, *optional*, defaults to `True`):
- Whether to add a pooling layer
- """
- super().__init__(config)
- self.config = config
- # vit encoder
- if config.is_hybrid:
- self.embeddings = DPTViTHybridEmbeddings(config)
- else:
- self.embeddings = DPTViTEmbeddings(config)
- self.encoder = DPTViTEncoder(config)
- self.layernorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- self.pooler = DPTViTPooler(config) if add_pooling_layer else None
- # Initialize weights and apply final processing
- self.post_init()
- def get_input_embeddings(self):
- if self.config.is_hybrid:
- return self.embeddings
- else:
- return self.embeddings.patch_embeddings
- @merge_with_config_defaults
- @capture_outputs(tie_last_hidden_states=False)
- @auto_docstring
- def forward(
- self,
- pixel_values: torch.FloatTensor,
- **kwargs: Unpack[TransformersKwargs],
- ) -> BaseModelOutputWithPoolingAndIntermediateActivations:
- embedding_output: BaseModelOutputWithIntermediateActivations = self.embeddings(pixel_values)
- embedding_last_hidden_states = embedding_output.last_hidden_states
- encoder_outputs: BaseModelOutput = self.encoder(embedding_last_hidden_states, **kwargs)
- sequence_output = encoder_outputs.last_hidden_state
- sequence_output = self.layernorm(sequence_output)
- pooled_output = self.pooler(sequence_output) if self.pooler is not None else None
- return BaseModelOutputWithPoolingAndIntermediateActivations(
- last_hidden_state=sequence_output,
- pooler_output=pooled_output,
- intermediate_activations=embedding_output.intermediate_activations,
- )
- # Copied from transformers.models.vit.modeling_vit.ViTPooler with ViTConfig->DPTConfig, ViTPooler->DPTViTPooler
- class DPTViTPooler(nn.Module):
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.dense = nn.Linear(config.hidden_size, config.pooler_output_size)
- self.activation = ACT2FN[config.pooler_act]
- def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
- # We "pool" the model by simply taking the hidden state corresponding
- # to the first token.
- first_token_tensor = hidden_states[:, 0]
- pooled_output = self.dense(first_token_tensor)
- pooled_output = self.activation(pooled_output)
- return pooled_output
- class DPTNeck(nn.Module):
- """
- DPTNeck. A neck is a module that is normally used between the backbone and the head. It takes a list of tensors as
- input and produces another list of tensors as output. For DPT, it includes 2 stages:
- * DPTReassembleStage
- * DPTFeatureFusionStage.
- Args:
- config (dict): config dict.
- """
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.config = config
- # postprocessing: only required in case of a non-hierarchical backbone (e.g. ViT, BEiT)
- if config.backbone_config is not None and config.backbone_config.model_type == "swinv2":
- self.reassemble_stage = None
- else:
- self.reassemble_stage = DPTReassembleStage(config)
- self.convs = nn.ModuleList()
- for channel in config.neck_hidden_sizes:
- self.convs.append(nn.Conv2d(channel, config.fusion_hidden_size, kernel_size=3, padding=1, bias=False))
- # fusion
- self.fusion_stage = DPTFeatureFusionStage(config)
- def forward(
- self,
- hidden_states: list[torch.Tensor],
- patch_height: int | None = None,
- patch_width: int | None = None,
- ) -> list[torch.Tensor]:
- """
- Args:
- hidden_states (`list[torch.FloatTensor]`, each of shape `(batch_size, sequence_length, hidden_size)` or `(batch_size, hidden_size, height, width)`):
- List of hidden states from the backbone.
- """
- if not isinstance(hidden_states, (tuple, list)):
- raise TypeError("hidden_states should be a tuple or list of tensors")
- if len(hidden_states) != len(self.config.neck_hidden_sizes):
- raise ValueError("The number of hidden states should be equal to the number of neck hidden sizes.")
- # postprocess hidden states
- if self.reassemble_stage is not None:
- hidden_states = self.reassemble_stage(hidden_states, patch_height, patch_width)
- features = [self.convs[i](feature) for i, feature in enumerate(hidden_states)]
- # fusion blocks
- output = self.fusion_stage(features)
- return output
- class DPTDepthEstimationHead(nn.Module):
- """
- Output head consisting of 3 convolutional layers. It progressively halves the feature dimension and upsamples
- the predictions to the input resolution after the first convolutional layer (details can be found in the paper's
- supplementary material).
- """
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.config = config
- self.projection = None
- if config.add_projection:
- self.projection = nn.Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
- features = config.fusion_hidden_size
- self.head = nn.Sequential(
- nn.Conv2d(features, features // 2, kernel_size=3, stride=1, padding=1),
- nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True),
- nn.Conv2d(features // 2, 32, kernel_size=3, stride=1, padding=1),
- nn.ReLU(),
- nn.Conv2d(32, 1, kernel_size=1, stride=1, padding=0),
- nn.ReLU(),
- )
- def forward(self, hidden_states: list[torch.Tensor]) -> torch.Tensor:
- # use last features
- hidden_states = hidden_states[self.config.head_in_index]
- if self.projection is not None:
- hidden_states = self.projection(hidden_states)
- hidden_states = nn.ReLU()(hidden_states)
- predicted_depth = self.head(hidden_states)
- predicted_depth = predicted_depth.squeeze(dim=1)
- return predicted_depth
- @auto_docstring(
- custom_intro="""
- DPT Model with a depth estimation head on top (consisting of 3 convolutional layers) e.g. for KITTI, NYUv2.
- """
- )
- class DPTForDepthEstimation(DPTPreTrainedModel):
- def __init__(self, config):
- super().__init__(config)
- self.backbone = None
- if config.is_hybrid is False and config.backbone_config is not None:
- self.backbone = load_backbone(config)
- else:
- self.dpt = DPTModel(config, add_pooling_layer=False)
- # Neck
- self.neck = DPTNeck(config)
- # Depth estimation head
- self.head = DPTDepthEstimationHead(config)
- # Initialize weights and apply final processing
- self.post_init()
- @can_return_tuple
- @auto_docstring
- def forward(
- self,
- pixel_values: torch.FloatTensor,
- labels: torch.LongTensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> DepthEstimatorOutput:
- r"""
- labels (`torch.LongTensor` of shape `(batch_size, height, width)`, *optional*):
- Ground truth depth estimation maps for computing the loss.
- Examples:
- ```python
- >>> from transformers import AutoImageProcessor, DPTForDepthEstimation
- >>> import torch
- >>> import numpy as np
- >>> from PIL import Image
- >>> import httpx
- >>> from io import BytesIO
- >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg"
- >>> with httpx.stream("GET", url) as response:
- ... image = Image.open(BytesIO(response.read()))
- >>> image_processor = AutoImageProcessor.from_pretrained("Intel/dpt-large")
- >>> model = DPTForDepthEstimation.from_pretrained("Intel/dpt-large")
- >>> # prepare image for the model
- >>> inputs = image_processor(images=image, return_tensors="pt")
- >>> with torch.no_grad():
- ... outputs = model(**inputs)
- >>> # interpolate to original size
- >>> post_processed_output = image_processor.post_process_depth_estimation(
- ... outputs,
- ... target_sizes=[(image.height, image.width)],
- ... )
- >>> # visualize the prediction
- >>> predicted_depth = post_processed_output[0]["predicted_depth"]
- >>> depth = predicted_depth * 255 / predicted_depth.max()
- >>> depth = depth.detach().cpu().numpy()
- >>> depth = Image.fromarray(depth.astype("uint8"))
- ```"""
- loss = None
- if labels is not None:
- raise NotImplementedError("Training is not implemented yet")
- # Internally the model always needs to output hidden states, we control the output
- # per user request on the final output
- user_requested_hidden_states = kwargs.get("output_hidden_states") or getattr(
- self.config, "output_hidden_states", False
- )
- kwargs["output_hidden_states"] = True
- if self.backbone is not None:
- outputs = self.backbone.forward_with_filtered_kwargs(pixel_values, **kwargs)
- hidden_states = outputs.feature_maps
- else:
- outputs = self.dpt(pixel_values, **kwargs)
- hidden_states = outputs.hidden_states
- # only keep certain features based on config.backbone_out_indices
- # note that the hidden_states also include the initial embeddings
- if not self.config.is_hybrid:
- hidden_states = [
- feature for idx, feature in enumerate(hidden_states[1:]) if idx in self.config.backbone_out_indices
- ]
- else:
- backbone_hidden_states = outputs.intermediate_activations
- backbone_hidden_states.extend(
- feature
- for idx, feature in enumerate(hidden_states[1:])
- if idx in self.config.backbone_out_indices[2:]
- )
- hidden_states = backbone_hidden_states
- patch_height, patch_width = None, None
- if self.config.backbone_config is not None and self.config.is_hybrid is False:
- _, _, height, width = pixel_values.shape
- patch_size = self.config.backbone_config.patch_size
- patch_height = height // patch_size
- patch_width = width // patch_size
- hidden_states = self.neck(hidden_states, patch_height, patch_width)
- predicted_depth = self.head(hidden_states)
- return DepthEstimatorOutput(
- loss=loss,
- predicted_depth=predicted_depth,
- hidden_states=outputs.hidden_states if user_requested_hidden_states else None,
- attentions=outputs.attentions,
- )
- class DPTSemanticSegmentationHead(nn.Module):
- def __init__(self, config: DPTConfig):
- super().__init__()
- self.config = config
- features = config.fusion_hidden_size
- self.head = nn.Sequential(
- nn.Conv2d(features, features, kernel_size=3, padding=1, bias=False),
- nn.BatchNorm2d(features),
- nn.ReLU(),
- nn.Dropout(config.semantic_classifier_dropout),
- nn.Conv2d(features, config.num_labels, kernel_size=1),
- nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True),
- )
- def forward(self, hidden_states: list[torch.Tensor]) -> torch.Tensor:
- # use last features
- hidden_states = hidden_states[self.config.head_in_index]
- logits = self.head(hidden_states)
- return logits
- class DPTAuxiliaryHead(nn.Module):
- def __init__(self, config: DPTConfig):
- super().__init__()
- features = config.fusion_hidden_size
- self.head = nn.Sequential(
- nn.Conv2d(features, features, kernel_size=3, padding=1, bias=False),
- nn.BatchNorm2d(features),
- nn.ReLU(),
- nn.Dropout(0.1, False),
- nn.Conv2d(features, config.num_labels, kernel_size=1),
- )
- def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
- logits = self.head(hidden_states)
- return logits
- @auto_docstring
- class DPTForSemanticSegmentation(DPTPreTrainedModel):
- def __init__(self, config: DPTConfig):
- super().__init__(config)
- self.dpt = DPTModel(config, add_pooling_layer=False)
- # Neck
- self.neck = DPTNeck(config)
- # Segmentation head(s)
- self.head = DPTSemanticSegmentationHead(config)
- self.auxiliary_head = DPTAuxiliaryHead(config) if config.use_auxiliary_head else None
- # Initialize weights and apply final processing
- self.post_init()
- @can_return_tuple
- @auto_docstring
- def forward(
- self,
- pixel_values: torch.FloatTensor | None = None,
- labels: torch.LongTensor | None = None,
- **kwargs: Unpack[TransformersKwargs],
- ) -> SemanticSegmenterOutput:
- r"""
- labels (`torch.LongTensor` of shape `(batch_size, height, width)`, *optional*):
- Ground truth semantic segmentation maps for computing the loss. Indices should be in `[0, ...,
- config.num_labels - 1]`. If `config.num_labels > 1`, a classification loss is computed (Cross-Entropy).
- Examples:
- ```python
- >>> from transformers import AutoImageProcessor, DPTForSemanticSegmentation
- >>> from PIL import Image
- >>> import httpx
- >>> from io import BytesIO
- >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg"
- >>> with httpx.stream("GET", url) as response:
- ... image = Image.open(BytesIO(response.read()))
- >>> image_processor = AutoImageProcessor.from_pretrained("Intel/dpt-large-ade")
- >>> model = DPTForSemanticSegmentation.from_pretrained("Intel/dpt-large-ade")
- >>> inputs = image_processor(images=image, return_tensors="pt")
- >>> outputs = model(**inputs)
- >>> logits = outputs.logits
- ```"""
- if labels is not None and self.config.num_labels == 1:
- raise ValueError("The number of labels should be greater than one")
- # Internally the model always needs to output hidden states, we control the output
- # per user request on the final output
- user_requested_hidden_states = kwargs.get("output_hidden_states") or getattr(
- self.config, "output_hidden_states", False
- )
- kwargs["output_hidden_states"] = True
- outputs: BaseModelOutputWithPoolingAndIntermediateActivations = self.dpt(pixel_values, **kwargs)
- hidden_states = outputs.hidden_states
- # only keep certain features based on config.backbone_out_indices
- # note that the hidden_states also include the initial embeddings
- if not self.config.is_hybrid:
- hidden_states = [
- feature for idx, feature in enumerate(hidden_states[1:]) if idx in self.config.backbone_out_indices
- ]
- else:
- backbone_hidden_states = outputs.intermediate_activations
- backbone_hidden_states.extend(
- feature for idx, feature in enumerate(hidden_states[1:]) if idx in self.config.backbone_out_indices[2:]
- )
- hidden_states = backbone_hidden_states
- hidden_states = self.neck(hidden_states=hidden_states)
- logits = self.head(hidden_states)
- auxiliary_logits = None
- if self.auxiliary_head is not None:
- auxiliary_logits = self.auxiliary_head(hidden_states[-1])
- loss = None
- if labels is not None:
- # upsample logits to the images' original size
- upsampled_logits = nn.functional.interpolate(
- logits, size=labels.shape[-2:], mode="bilinear", align_corners=False
- )
- if auxiliary_logits is not None:
- upsampled_auxiliary_logits = nn.functional.interpolate(
- auxiliary_logits, size=labels.shape[-2:], mode="bilinear", align_corners=False
- )
- # compute weighted loss
- loss_fct = CrossEntropyLoss(ignore_index=self.config.semantic_loss_ignore_index)
- main_loss = loss_fct(upsampled_logits, labels)
- auxiliary_loss = loss_fct(upsampled_auxiliary_logits, labels)
- loss = main_loss + self.config.auxiliary_loss_weight * auxiliary_loss
- return SemanticSegmenterOutput(
- loss=loss,
- logits=logits,
- hidden_states=outputs.hidden_states if user_requested_hidden_states else None,
- attentions=outputs.attentions,
- )
- __all__ = ["DPTForDepthEstimation", "DPTForSemanticSegmentation", "DPTModel", "DPTPreTrainedModel"]
|