# Copyright 2024 Cohere Inc. HuggingFace Inc. team. All rights reserved. # # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections.abc import Callable import torch import torch.nn as nn from huggingface_hub.dataclasses import strict from ...cache_utils import Cache, DynamicCache from ...configuration_utils import PreTrainedConfig from ...masking_utils import create_causal_mask, create_sliding_window_causal_mask from ...modeling_outputs import BaseModelOutputWithPast from ...modeling_rope_utils import ( RopeParameters, dynamic_rope_update, ) from ...modeling_utils import ALL_ATTENTION_FUNCTIONS from ...processing_utils import Unpack from ...utils import TransformersKwargs, auto_docstring, logging from ...utils.generic import maybe_autocast from ..cohere.modeling_cohere import ( CohereAttention, CohereDecoderLayer, CohereForCausalLM, CohereLayerNorm, CoherePreTrainedModel, CohereRotaryEmbedding, apply_rotary_pos_emb, eager_attention_forward, ) from ..gemma2.modeling_gemma2 import Gemma2Model logger = logging.get_logger(__name__) @auto_docstring(checkpoint="CohereForAI/c4ai-command-r-v01") @strict class Cohere2Config(PreTrainedConfig): r""" logit_scale (`float`, *optional*, defaults to 0.0625): The scaling factor for the output logits. ```python >>> from transformers import Cohere2Model, Cohere2Config >>> # Initializing a Cohere Nextmodel configuration >>> configuration = Cohere2Config() >>> # Initializing a model from the Cohere2 configuration >>> model = Cohere2Model(configuration) # doctest: +SKIP >>> # Accessing the model configuration >>> configuration = model.config # doctest: +SKIP ``` """ model_type = "cohere2" keys_to_ignore_at_inference = ["past_key_values"] base_model_tp_plan = { "layers.*.self_attn.q_proj": "colwise", "layers.*.self_attn.k_proj": "colwise", "layers.*.self_attn.v_proj": "colwise", "layers.*.self_attn.o_proj": "rowwise", "layers.*.mlp.gate_proj": "colwise", "layers.*.mlp.up_proj": "colwise", "layers.*.mlp.down_proj": "rowwise", } base_model_pp_plan = { "embed_tokens": (["input_ids"], ["inputs_embeds"]), "layers": (["hidden_states", "attention_mask"], ["hidden_states"]), "norm": (["hidden_states"], ["hidden_states"]), } vocab_size: int = 256000 hidden_size: int = 8192 intermediate_size: int = 22528 logit_scale: float = 0.0625 num_hidden_layers: int = 40 num_attention_heads: int = 64 num_key_value_heads: int | None = None hidden_act: str = "silu" max_position_embeddings: int = 8192 initializer_range: float = 0.02 layer_norm_eps: float = 1e-5 use_cache: bool = True pad_token_id: int | None = 0 bos_token_id: int | None = 5 eos_token_id: int | list[int] | None = 255001 tie_word_embeddings: bool = True rope_parameters: RopeParameters | dict | None = None attention_bias: bool = False attention_dropout: float | int = 0.0 sliding_window: int | None = 4096 layer_types: list[str] | None = None def __post_init__(self, **kwargs): if self.num_key_value_heads is None: self.num_key_value_heads = self.num_attention_heads # Need to specify head_dim in the config so it can be used in the attention forward functions self.head_dim = self.hidden_size // self.num_attention_heads # BC -> the pattern used to be a simple int, and it's still present in configs on the Hub if self.layer_types is None: # BC -> the pattern used to be a simple int, and it's still present in configs on the Hub _sliding_window_pattern = kwargs.pop("sliding_window_pattern", 4) self.layer_types = [ "sliding_attention" if bool((i + 1) % _sliding_window_pattern) else "full_attention" for i in range(self.num_hidden_layers) ] super().__post_init__(**kwargs) class Cohere2RotaryEmbedding(CohereRotaryEmbedding): @torch.no_grad() @dynamic_rope_update # power user: used with advanced RoPE types (e.g. dynamic rope) def forward(self, x, position_ids): inv_freq_expanded = self.inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1) position_ids_expanded = position_ids[:, None, :].float() device_type = x.device.type if isinstance(x.device.type, str) and x.device.type != "mps" else "cpu" with maybe_autocast(device_type=device_type, enabled=False): # Force float32 freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(1, 2) emb = torch.repeat_interleave(freqs, 2, dim=-1) # diff from Llama: we interleave() instead of cat() cos = emb.cos() * self.attention_scaling sin = emb.sin() * self.attention_scaling return cos.to(dtype=x.dtype), sin.to(dtype=x.dtype) class Cohere2LayerNorm(CohereLayerNorm): pass class Cohere2Attention(CohereAttention): """Multi-headed attention from 'Attention Is All You Need' paper""" def __init__(self, config: Cohere2Config, layer_idx: int | None = None): nn.Module.__init__(self) self.config = config self.layer_idx = layer_idx self.head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads) self.num_key_value_groups = config.num_attention_heads // config.num_key_value_heads self.scaling = self.head_dim**-0.5 self.attention_dropout = config.attention_dropout self.is_causal = True layer_type = config.layer_types[layer_idx] if hasattr(config, "layer_types") else None self.sliding_window = config.sliding_window if layer_type == "sliding_attention" else None self.q_proj = nn.Linear( config.hidden_size, config.num_attention_heads * self.head_dim, bias=config.attention_bias ) self.k_proj = nn.Linear( config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias ) self.v_proj = nn.Linear( config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias ) self.o_proj = nn.Linear( config.num_attention_heads * self.head_dim, config.hidden_size, bias=config.attention_bias ) def forward( self, hidden_states: torch.Tensor, position_embeddings: tuple[torch.Tensor, torch.Tensor], attention_mask: torch.Tensor | None, past_key_values: Cache | None = None, **kwargs: Unpack[TransformersKwargs], ) -> tuple[torch.Tensor, torch.Tensor | None, tuple[torch.Tensor] | None]: input_shape = hidden_states.shape[:-1] hidden_shape = (*input_shape, -1, self.head_dim) query_states = self.q_proj(hidden_states).view(hidden_shape).transpose(1, 2) key_states = self.k_proj(hidden_states).view(hidden_shape).transpose(1, 2) value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2) cos, sin = position_embeddings if self.sliding_window is not None: query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) if past_key_values is not None: key_states, value_states = past_key_values.update(key_states, value_states, self.layer_idx) attention_interface: Callable = ALL_ATTENTION_FUNCTIONS.get_interface( self.config._attn_implementation, eager_attention_forward ) attn_output, attn_weights = attention_interface( self, query_states, key_states, value_states, attention_mask, dropout=0.0 if not self.training else self.attention_dropout, scaling=self.scaling, sliding_window=self.sliding_window, **kwargs, ) attn_output = attn_output.reshape(*input_shape, -1).contiguous() attn_output = self.o_proj(attn_output) return attn_output, attn_weights class Cohere2DecoderLayer(CohereDecoderLayer): def __init__(self, config: Cohere2Config, layer_idx: int): super().__init__(config, layer_idx) def forward( self, hidden_states: torch.Tensor, position_embeddings: tuple[torch.Tensor, torch.Tensor] | None = None, attention_mask: torch.Tensor | None = None, past_key_values: Cache | None = None, use_cache: bool | None = False, **kwargs: Unpack[TransformersKwargs], ) -> tuple[torch.FloatTensor, tuple[torch.FloatTensor, torch.FloatTensor] | None]: residual = hidden_states hidden_states = self.input_layernorm(hidden_states) hidden_states_attention, _ = self.self_attn( hidden_states=hidden_states, position_embeddings=position_embeddings, attention_mask=attention_mask, past_key_values=past_key_values, use_cache=use_cache, **kwargs, ) hidden_states_mlp = self.mlp(hidden_states) hidden_states = residual + hidden_states_attention + hidden_states_mlp return hidden_states class Cohere2PreTrainedModel(CoherePreTrainedModel): config: Cohere2Config _can_record_outputs = { "hidden_states": Cohere2DecoderLayer, "attentions": Cohere2Attention, } class Cohere2Model(Gemma2Model): def __init__(self, config: Cohere2Config): super().__init__(config) self.norm = Cohere2LayerNorm(hidden_size=(config.hidden_size), eps=config.layer_norm_eps) self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx) def forward( self, input_ids: torch.LongTensor | None = None, attention_mask: torch.Tensor | None = None, position_ids: torch.LongTensor | None = None, past_key_values: Cache | None = None, inputs_embeds: torch.FloatTensor | None = None, use_cache: bool | None = None, **kwargs: Unpack[TransformersKwargs], ) -> BaseModelOutputWithPast: if (input_ids is None) ^ (inputs_embeds is not None): raise ValueError("You must specify exactly one of input_ids or inputs_embeds") if inputs_embeds is None: inputs_embeds = self.embed_tokens(input_ids) if use_cache and past_key_values is None: past_key_values = DynamicCache(config=self.config) if position_ids is None: past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0 position_ids = torch.arange(inputs_embeds.shape[1], device=inputs_embeds.device) + past_seen_tokens position_ids = position_ids.unsqueeze(0) if not isinstance(causal_mask_mapping := attention_mask, dict): mask_kwargs = { "config": self.config, "inputs_embeds": inputs_embeds, "attention_mask": attention_mask, "past_key_values": past_key_values, "position_ids": position_ids, } causal_mask_mapping = { "full_attention": create_causal_mask(**mask_kwargs), "sliding_attention": create_sliding_window_causal_mask(**mask_kwargs), } hidden_states = inputs_embeds position_embeddings = self.rotary_emb(hidden_states, position_ids) for i, decoder_layer in enumerate(self.layers): hidden_states = decoder_layer( hidden_states, attention_mask=causal_mask_mapping[self.config.layer_types[i]], position_embeddings=position_embeddings, past_key_values=past_key_values, use_cache=use_cache, position_ids=position_ids, **kwargs, ) hidden_states = self.norm(hidden_states) return BaseModelOutputWithPast( last_hidden_state=hidden_states, past_key_values=past_key_values, ) class Cohere2ForCausalLM(CohereForCausalLM): pass __all__ = ["Cohere2Config", "Cohere2ForCausalLM", "Cohere2Model", "Cohere2PreTrainedModel"]