| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- # Copyright 2022 The OpenBMB Team and The HuggingFace Inc. team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Tokenization classes for CPMAnt."""
- import collections
- import os
- from transformers.utils import is_rjieba_available, requires_backends
- if is_rjieba_available():
- import rjieba
- from ...tokenization_python import PreTrainedTokenizer
- from ...utils import logging
- logger = logging.get_logger(__name__)
- VOCAB_FILES_NAMES = {"vocab_file": "vocab.txt"}
- def load_vocab(vocab_file):
- """Loads a vocabulary file into a dictionary."""
- vocab = collections.OrderedDict()
- with open(vocab_file, "r", encoding="utf-8") as reader:
- tokens = reader.readlines()
- for index, token in enumerate(tokens):
- token = token.rstrip("\n")
- vocab[token] = index
- return vocab
- class WordpieceTokenizer:
- def __init__(self, vocab, unk_token="<unk>", max_input_chars_per_word=200):
- self.vocab = vocab
- self.unk_token = unk_token
- self.max_input_chars_per_word = max_input_chars_per_word
- def tokenize(self, token):
- chars = list(token)
- if len(chars) > self.max_input_chars_per_word:
- return [self.unk_token]
- start = 0
- sub_tokens = []
- while start < len(chars):
- end = len(chars)
- cur_substr = None
- while start < end:
- substr = "".join(chars[start:end])
- if substr in self.vocab:
- cur_substr = substr
- break
- end -= 1
- if cur_substr is None:
- sub_tokens.append(self.unk_token)
- start += 1
- else:
- sub_tokens.append(cur_substr)
- start = end
- return sub_tokens
- class CpmAntTokenizer(PreTrainedTokenizer):
- """
- Construct a CPMAnt tokenizer. Based on byte-level Byte-Pair-Encoding.
- Args:
- vocab_file (`str`):
- Path to the vocabulary file.
- bod_token (`str`, *optional*, defaults to `"<d>"`):
- The beginning of document token.
- eod_token (`str`, *optional*, defaults to `"</d>"`):
- The end of document token.
- bos_token (`str`, *optional*, defaults to `"<s>"`):
- The beginning of sequence token.
- eos_token (`str`, *optional*, defaults to `"</s>"`):
- The end of sequence token.
- pad_token (`str`, *optional*, defaults to `"<pad>"`):
- The token used for padding.
- unk_token (`str`, *optional*, defaults to `"<unk>"`):
- The unknown token.
- line_token (`str`, *optional*, defaults to `"</n>"`):
- The line token.
- space_token (`str`, *optional*, defaults to `"</_>"`):
- The space token.
- """
- vocab_files_names = VOCAB_FILES_NAMES
- model_input_names = ["input_ids", "attention_mask"]
- add_prefix_space = False
- def __init__(
- self,
- vocab_file,
- bod_token="<d>",
- eod_token="</d>",
- bos_token="<s>",
- eos_token="</s>",
- pad_token="<pad>",
- unk_token="<unk>",
- line_token="</n>",
- space_token="</_>",
- padding_side="left",
- **kwargs,
- ):
- requires_backends(self, ["rjieba"])
- self.bod_token = bod_token
- self.eod_token = eod_token
- self.encoder = load_vocab(vocab_file)
- self.encoder[" "] = self.encoder[space_token]
- self.encoder["\n"] = self.encoder[line_token]
- del self.encoder[space_token]
- del self.encoder[line_token]
- self.encoder = collections.OrderedDict(sorted(self.encoder.items(), key=lambda x: x[1]))
- self.decoder = {v: k for k, v in self.encoder.items()}
- self.wordpiece_tokenizer = WordpieceTokenizer(vocab=self.encoder, unk_token=unk_token)
- super().__init__(
- bod_token=bod_token,
- eod_token=eod_token,
- bos_token=bos_token,
- eos_token=eos_token,
- pad_token=pad_token,
- unk_token=unk_token,
- line_token=line_token,
- space_token=space_token,
- padding_side=padding_side,
- token_type_ids_pattern="all_zeros",
- token_type_ids_include_special_tokens=True,
- special_tokens_pattern="bos",
- **kwargs,
- )
- for special_token in [space_token, line_token]:
- token_id = self.added_tokens_encoder.pop(special_token, None)
- if token_id is not None:
- self._added_tokens_decoder.pop(token_id, None)
- self._update_total_vocab_size()
- @property
- def bod_token_id(self):
- return self.encoder[self.bod_token]
- @property
- def eod_token_id(self):
- return self.encoder[self.eod_token]
- @property
- def newline_id(self):
- return self.encoder["\n"]
- @property
- def vocab_size(self) -> int:
- return len(self.encoder)
- def get_vocab(self):
- return dict(self.encoder, **self.added_tokens_encoder)
- def _tokenize(self, text):
- """Tokenize a string."""
- output_tokens = []
- for x in rjieba.cut(text, False):
- output_tokens.extend(self.wordpiece_tokenizer.tokenize(x))
- return output_tokens
- def _decode(self, token_ids, **kwargs):
- """Decode ids into a string."""
- token_ids = [i for i in token_ids if i >= 0]
- token_ids = [
- x for x in token_ids if x != self.pad_token_id and x != self.eos_token_id and x != self.bos_token_id
- ]
- return super()._decode(token_ids, **kwargs)
- def check(self, token):
- return token in self.encoder
- def convert_tokens_to_string(self, tokens: list[str]) -> str:
- return "".join(tokens)
- def _convert_token_to_id(self, token):
- """Converts a token (str) in an id using the vocab."""
- return self.encoder.get(token, self.encoder.get(self.unk_token))
- def _convert_id_to_token(self, index):
- """Converts an index (integer) in a token (str) using the vocab."""
- return self.decoder.get(index, self.unk_token)
- def save_vocabulary(self, save_directory: str, filename_prefix: str | None = None) -> tuple[str]:
- if os.path.isdir(save_directory):
- vocab_file = os.path.join(
- save_directory, (filename_prefix + "-" if filename_prefix else "") + VOCAB_FILES_NAMES["vocab_file"]
- )
- else:
- vocab_file = (filename_prefix + "-" if filename_prefix else "") + save_directory
- index = 0
- if " " in self.encoder:
- self.encoder["</_>"] = self.encoder[" "]
- del self.encoder[" "]
- if "\n" in self.encoder:
- self.encoder["</n>"] = self.encoder["\n"]
- del self.encoder["\n"]
- self.encoder = collections.OrderedDict(sorted(self.encoder.items(), key=lambda x: x[1]))
- with open(vocab_file, "w", encoding="utf-8") as writer:
- for token, token_index in self.encoder.items():
- if index != token_index:
- logger.warning(
- f"Saving vocabulary to {vocab_file}: vocabulary indices are not consecutive."
- " Please check that the vocabulary is not corrupted!"
- )
- index = token_index
- writer.write(token + "\n")
- index += 1
- return (vocab_file,)
- __all__ = ["CpmAntTokenizer"]
|