| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- import collections
- import types
- import numpy as np
- from ..generation import GenerationConfig
- from ..utils import (
- add_end_docstrings,
- is_torch_available,
- requires_backends,
- )
- from .base import ArgumentHandler, Dataset, Pipeline, PipelineException, build_pipeline_init_args
- if is_torch_available():
- import torch
- from ..models.auto.modeling_auto import (
- MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING_NAMES,
- MODEL_FOR_TABLE_QUESTION_ANSWERING_MAPPING_NAMES,
- )
- class TableQuestionAnsweringArgumentHandler(ArgumentHandler):
- """
- Handles arguments for the TableQuestionAnsweringPipeline
- """
- def __call__(self, table=None, query=None, **kwargs):
- # Returns tqa_pipeline_inputs of shape:
- # [
- # {"table": pd.DataFrame, "query": list[str]},
- # ...,
- # {"table": pd.DataFrame, "query" : list[str]}
- # ]
- requires_backends(self, "pandas")
- import pandas as pd
- if table is None:
- raise ValueError("Keyword argument `table` cannot be None.")
- elif query is None:
- if isinstance(table, dict) and table.get("query") is not None and table.get("table") is not None:
- tqa_pipeline_inputs = [table]
- elif isinstance(table, list) and len(table) > 0:
- if not all(isinstance(d, dict) for d in table):
- raise ValueError(
- f"Keyword argument `table` should be a list of dict, but is {(type(d) for d in table)}"
- )
- if table[0].get("query") is not None and table[0].get("table") is not None:
- tqa_pipeline_inputs = table
- else:
- raise ValueError(
- "If keyword argument `table` is a list of dictionaries, each dictionary should have a `table`"
- f" and `query` key, but only dictionary has keys {table[0].keys()} `table` and `query` keys."
- )
- elif Dataset is not None and isinstance(table, Dataset) or isinstance(table, types.GeneratorType):
- return table
- else:
- raise ValueError(
- "Invalid input. Keyword argument `table` should be either of type `dict` or `list`, but "
- f"is {type(table)})"
- )
- else:
- tqa_pipeline_inputs = [{"table": table, "query": query}]
- for tqa_pipeline_input in tqa_pipeline_inputs:
- if not isinstance(tqa_pipeline_input["table"], pd.DataFrame):
- if tqa_pipeline_input["table"] is None:
- raise ValueError("Table cannot be None.")
- tqa_pipeline_input["table"] = pd.DataFrame(tqa_pipeline_input["table"])
- return tqa_pipeline_inputs
- @add_end_docstrings(build_pipeline_init_args(has_tokenizer=True))
- class TableQuestionAnsweringPipeline(Pipeline):
- """
- Table Question Answering pipeline using a `ModelForTableQuestionAnswering`. This pipeline is only available in
- PyTorch.
- Unless the model you're using explicitly sets these generation parameters in its configuration files
- (`generation_config.json`), the following default values will be used:
- - max_new_tokens: 256
- Example:
- ```python
- >>> from transformers import pipeline
- >>> oracle = pipeline(model="google/tapas-base-finetuned-wtq")
- >>> table = {
- ... "Repository": ["Transformers", "Datasets", "Tokenizers"],
- ... "Stars": ["36542", "4512", "3934"],
- ... "Contributors": ["651", "77", "34"],
- ... "Programming language": ["Python", "Python", "Rust, Python and NodeJS"],
- ... }
- >>> oracle(query="How many stars does the transformers repository have?", table=table)
- {'answer': 'AVERAGE > 36542', 'coordinates': [(0, 1)], 'cells': ['36542'], 'aggregator': 'AVERAGE'}
- ```
- Learn more about the basics of using a pipeline in the [pipeline tutorial](../pipeline_tutorial)
- This tabular question answering pipeline can currently be loaded from [`pipeline`] using the following task
- identifier: `"table-question-answering"`.
- The models that this pipeline can use are models that have been fine-tuned on a tabular question answering task.
- See the up-to-date list of available models on
- [huggingface.co/models](https://huggingface.co/models?filter=table-question-answering).
- """
- default_input_names = "table,query"
- _pipeline_calls_generate = True
- _load_processor = False
- _load_image_processor = False
- _load_feature_extractor = False
- _load_tokenizer = True
- # Make sure the docstring is updated when the default generation config is changed
- _default_generation_config = GenerationConfig(
- max_new_tokens=256,
- )
- def __init__(self, args_parser=TableQuestionAnsweringArgumentHandler(), **kwargs):
- super().__init__(**kwargs)
- self._args_parser = args_parser
- mapping = MODEL_FOR_TABLE_QUESTION_ANSWERING_MAPPING_NAMES.copy()
- mapping.update(MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING_NAMES)
- self.check_model_type(mapping)
- self.aggregate = getattr(self.model.config, "aggregation_labels", None) and getattr(
- self.model.config, "num_aggregation_labels", None
- )
- self.type = "tapas" if hasattr(self.model.config, "aggregation_labels") else None
- def batch_inference(self, **inputs):
- return self.model(**inputs)
- def sequential_inference(self, **inputs):
- """
- Inference used for models that need to process sequences in a sequential fashion, like the SQA models which
- handle conversational query related to a table.
- """
- all_logits = []
- all_aggregations = []
- prev_answers = None
- batch_size = inputs["input_ids"].shape[0]
- input_ids = inputs["input_ids"].to(self.device)
- attention_mask = inputs["attention_mask"].to(self.device)
- token_type_ids = inputs["token_type_ids"].to(self.device)
- token_type_ids_example = None
- for index in range(batch_size):
- # If sequences have already been processed, the token type IDs will be created according to the previous
- # answer.
- if prev_answers is not None:
- prev_labels_example = token_type_ids_example[:, 3] # shape (seq_len,)
- model_labels = np.zeros_like(prev_labels_example.cpu().numpy()) # shape (seq_len,)
- token_type_ids_example = token_type_ids[index] # shape (seq_len, 7)
- for i in range(model_labels.shape[0]):
- segment_id = token_type_ids_example[:, 0].tolist()[i]
- col_id = token_type_ids_example[:, 1].tolist()[i] - 1
- row_id = token_type_ids_example[:, 2].tolist()[i] - 1
- if row_id >= 0 and col_id >= 0 and segment_id == 1:
- model_labels[i] = int(prev_answers[(col_id, row_id)])
- token_type_ids_example[:, 3] = torch.from_numpy(model_labels).type(torch.long).to(self.device)
- input_ids_example = input_ids[index]
- attention_mask_example = attention_mask[index] # shape (seq_len,)
- token_type_ids_example = token_type_ids[index] # shape (seq_len, 7)
- outputs = self.model(
- input_ids=input_ids_example.unsqueeze(0),
- attention_mask=attention_mask_example.unsqueeze(0),
- token_type_ids=token_type_ids_example.unsqueeze(0),
- )
- logits = outputs.logits
- if self.aggregate:
- all_aggregations.append(outputs.logits_aggregation)
- all_logits.append(logits)
- dist_per_token = torch.distributions.Bernoulli(logits=logits)
- probabilities = dist_per_token.probs * attention_mask_example.type(torch.float32).to(
- dist_per_token.probs.device
- )
- coords_to_probs = collections.defaultdict(list)
- for i, p in enumerate(probabilities.squeeze().tolist()):
- segment_id = token_type_ids_example[:, 0].tolist()[i]
- col = token_type_ids_example[:, 1].tolist()[i] - 1
- row = token_type_ids_example[:, 2].tolist()[i] - 1
- if col >= 0 and row >= 0 and segment_id == 1:
- coords_to_probs[(col, row)].append(p)
- prev_answers = {key: np.array(coords_to_probs[key]).mean() > 0.5 for key in coords_to_probs}
- logits_batch = torch.cat(tuple(all_logits), 0)
- return (logits_batch,) if not self.aggregate else (logits_batch, torch.cat(tuple(all_aggregations), 0))
- def __call__(self, *args, **kwargs):
- r"""
- Answers queries according to a table. The pipeline accepts several types of inputs which are detailed below:
- - `pipeline(table, query)`
- - `pipeline(table, [query])`
- - `pipeline(table=table, query=query)`
- - `pipeline(table=table, query=[query])`
- - `pipeline({"table": table, "query": query})`
- - `pipeline({"table": table, "query": [query]})`
- - `pipeline([{"table": table, "query": query}, {"table": table, "query": query}])`
- The `table` argument should be a dict or a DataFrame built from that dict, containing the whole table:
- Example:
- ```python
- data = {
- "actors": ["brad pitt", "leonardo di caprio", "george clooney"],
- "age": ["56", "45", "59"],
- "number of movies": ["87", "53", "69"],
- "date of birth": ["7 february 1967", "10 june 1996", "28 november 1967"],
- }
- ```
- This dictionary can be passed in as such, or can be converted to a pandas DataFrame:
- Example:
- ```python
- import pandas as pd
- table = pd.DataFrame.from_dict(data)
- ```
- Args:
- table (`pd.DataFrame` or `Dict`):
- Pandas DataFrame or dictionary that will be converted to a DataFrame containing all the table values.
- See above for an example of dictionary.
- query (`str` or `list[str]`):
- Query or list of queries that will be sent to the model alongside the table.
- sequential (`bool`, *optional*, defaults to `False`):
- Whether to do inference sequentially or as a batch. Batching is faster, but models like SQA require the
- inference to be done sequentially to extract relations within sequences, given their conversational
- nature.
- padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `False`):
- Activates and controls padding. Accepts the following values:
- - `True` or `'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
- sequence if provided).
- - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum
- acceptable input length for the model if that argument is not provided.
- - `False` or `'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of different
- lengths).
- truncation (`bool`, `str` or [`TapasTruncationStrategy`], *optional*, defaults to `False`):
- Activates and controls truncation. Accepts the following values:
- - `True` or `'drop_rows_to_fit'`: Truncate to a maximum length specified with the argument `max_length`
- or to the maximum acceptable input length for the model if that argument is not provided. This will
- truncate row by row, removing rows from the table.
- - `False` or `'do_not_truncate'` (default): No truncation (i.e., can output batch with sequence lengths
- greater than the model maximum admissible input size).
- Return:
- A dictionary or a list of dictionaries containing results: Each result is a dictionary with the following
- keys:
- - **answer** (`str`) -- The answer of the query given the table. If there is an aggregator, the answer will
- be preceded by `AGGREGATOR >`.
- - **coordinates** (`list[tuple[int, int]]`) -- Coordinates of the cells of the answers.
- - **cells** (`list[str]`) -- List of strings made up of the answer cell values.
- - **aggregator** (`str`) -- If the model has an aggregator, this returns the aggregator.
- """
- pipeline_inputs = self._args_parser(*args, **kwargs)
- results = super().__call__(pipeline_inputs, **kwargs)
- if len(results) == 1:
- return results[0]
- return results
- def _sanitize_parameters(self, sequential=None, padding=None, truncation=None, **kwargs):
- preprocess_params = {}
- if padding is not None:
- preprocess_params["padding"] = padding
- if truncation is not None:
- preprocess_params["truncation"] = truncation
- forward_params = {}
- if sequential is not None:
- forward_params["sequential"] = sequential
- if getattr(self, "assistant_model", None) is not None:
- forward_params["assistant_model"] = self.assistant_model
- if getattr(self, "assistant_tokenizer", None) is not None:
- forward_params["tokenizer"] = self.tokenizer
- forward_params["assistant_tokenizer"] = self.assistant_tokenizer
- return preprocess_params, forward_params, {}
- def preprocess(self, pipeline_input, padding=True, truncation=None):
- if truncation is None:
- if self.type == "tapas":
- truncation = "drop_rows_to_fit"
- else:
- truncation = "do_not_truncate"
- table, query = pipeline_input["table"], pipeline_input["query"]
- if table.empty:
- raise ValueError("table is empty")
- if query is None or query == "":
- raise ValueError("query is empty")
- inputs = self.tokenizer(table, query, return_tensors="pt", truncation=truncation, padding=padding)
- inputs["table"] = table
- return inputs
- def _forward(self, model_inputs, sequential=False, **generate_kwargs):
- table = model_inputs.pop("table")
- if self.type == "tapas":
- if sequential:
- outputs = self.sequential_inference(**model_inputs)
- else:
- outputs = self.batch_inference(**model_inputs)
- else:
- # User-defined `generation_config` passed to the pipeline call take precedence
- if "generation_config" not in generate_kwargs:
- generate_kwargs["generation_config"] = self.generation_config
- outputs = self.model.generate(**model_inputs, **generate_kwargs)
- model_outputs = {"model_inputs": model_inputs, "table": table, "outputs": outputs}
- return model_outputs
- def postprocess(self, model_outputs):
- inputs = model_outputs["model_inputs"]
- table = model_outputs["table"]
- outputs = model_outputs["outputs"]
- if self.type == "tapas":
- if self.aggregate:
- logits, logits_agg = outputs[:2]
- predictions = self.tokenizer.convert_logits_to_predictions(inputs, logits, logits_agg)
- answer_coordinates_batch, agg_predictions = predictions
- aggregators = {i: self.model.config.aggregation_labels[pred] for i, pred in enumerate(agg_predictions)}
- no_agg_label_index = self.model.config.no_aggregation_label_index
- aggregators_prefix = {
- i: aggregators[i] + " > " for i, pred in enumerate(agg_predictions) if pred != no_agg_label_index
- }
- else:
- logits = outputs[0]
- predictions = self.tokenizer.convert_logits_to_predictions(inputs, logits)
- answer_coordinates_batch = predictions[0]
- aggregators = {}
- aggregators_prefix = {}
- answers = []
- for index, coordinates in enumerate(answer_coordinates_batch):
- cells = [table.iat[coordinate] for coordinate in coordinates]
- aggregator = aggregators.get(index, "")
- aggregator_prefix = aggregators_prefix.get(index, "")
- answer = {
- "answer": aggregator_prefix + ", ".join(cells),
- "coordinates": coordinates,
- "cells": [table.iat[coordinate] for coordinate in coordinates],
- }
- if aggregator:
- answer["aggregator"] = aggregator
- answers.append(answer)
- if len(answer) == 0:
- raise PipelineException("Table question answering", self.model.name_or_path, "Empty answer")
- else:
- answers = [{"answer": answer} for answer in self.tokenizer.batch_decode(outputs, skip_special_tokens=True)]
- return answers if len(answers) > 1 else answers[0]
|