huggingface-transformers

How to feed big data into pipeline of huggingface for inference


MODEL = "bert-base-uncased"

# load the model
model_name = MODEL + '-text-classification'

from transformers import AutoModelForSequenceClassification, AutoTokenizer

load_model = AutoModelForSequenceClassification.from_pretrained(model_name)
load_tokenizer = AutoTokenizer.from_pretrained(model_name)
from transformers import pipeline
my_pipeline  = pipeline("text-classification", model=load_model, 
                                                tokenizer=load_tokenizer)
a = list(df_0.limit(10000).toPandas()["lines"])
my_pipeline(a)

Error message:

Token indices sequence length is longer than the specified maximum sequence length for this model (1081 > 512). Running this sequence through the model will result in indexing errors
--------------------------------------------------------------------------- RuntimeError                              Traceback (most recent call last) Input In [26], in <cell line: 1>()
----> 1 b = my_pipeline(a)

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/transformers/pipelines/text_classification.py:138, in TextClassificationPipeline.__call__(self, *args, **kwargs)
    104 def __call__(self, *args, **kwargs):
    105     """
    106     Classify the text(s) given as inputs.
    107     (...)
    136         If `top_k` is used, one such dictionary is returned per label.
    137     """
--> 138     result = super().__call__(*args, **kwargs)
    139     if isinstance(args[0], str) and isinstance(result, dict):
    140         # This pipeline is odd, and return a list when single item is run
    141         return [result]

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/transformers/pipelines/base.py:1032, in Pipeline.__call__(self, inputs, num_workers, batch_size, *args,
**kwargs)    1028 if can_use_iterator:    1029     final_iterator = self.get_iterator(    1030         inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params    1031     )
-> 1032     outputs = [output for output in final_iterator]    1033     return outputs    1034 else:

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/transformers/pipelines/base.py:1032, in <listcomp>(.0)    1028 if can_use_iterator:    1029     final_iterator = self.get_iterator(    1030         inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params    1031     )
-> 1032     outputs = [output for output in final_iterator]    1033     return outputs    1034 else:

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/transformers/pipelines/pt_utils.py:111, in PipelineIterator.__next__(self)
    108     return self.loader_batch_item()
    110 # We're out of items within a batch
--> 111 item = next(self.iterator)
    112 processed = self.infer(item, **self.params)
    113 # We now have a batch of "inferred things".

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/transformers/pipelines/pt_utils.py:112, in PipelineIterator.__next__(self)
    110 # We're out of items within a batch
    111 item = next(self.iterator)
--> 112 processed = self.infer(item, **self.params)
    113 # We now have a batch of "inferred things".
    114 if self.loader_batch_size is not None:
    115     # Try to infer the size of the batch

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/transformers/pipelines/base.py:959, in Pipeline.forward(self, model_inputs, **forward_params)
    957     with inference_context():
    958         model_inputs = self._ensure_tensor_on_device(model_inputs, device=self.device)
--> 959         model_outputs = self._forward(model_inputs, **forward_params)
    960         model_outputs = self._ensure_tensor_on_device(model_outputs, device=torch.device("cpu"))
    961 else:

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/transformers/pipelines/text_classification.py:163, in TextClassificationPipeline._forward(self, model_inputs)
    162 def _forward(self, model_inputs):
--> 163     return self.model(**model_inputs)

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/torch/nn/modules/module.py:1130, in Module._call_impl(self, *input, **kwargs)    1126 # If we don't have any hooks, we want to skip the rest of the logic in    1127 # this function, and just call forward.    1128 if not (self._backward_hooks or self._forward_hooks or self._forward_pre_hooks or _global_backward_hooks    1129         or
_global_forward_hooks or _global_forward_pre_hooks):
-> 1130     return forward_call(*input, **kwargs)    1131 # Do not call functions when jit is used    1132 full_backward_hooks, non_full_backward_hooks = [], []

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/transformers/models/bert/modeling_bert.py:1556, in BertForSequenceClassification.forward(self, input_ids, attention_mask, token_type_ids, position_ids, head_mask, inputs_embeds, labels, output_attentions, output_hidden_states, return_dict)    1548 r"""    1549 labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):    1550     Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,    1551     config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If    1552     `config.num_labels > 1` a classification loss is computed (Cross-Entropy).    1553 """    1554 return_dict = return_dict if return_dict is not None else self.config.use_return_dict
-> 1556 outputs = self.bert(    1557     input_ids,    1558     attention_mask=attention_mask,    1559     token_type_ids=token_type_ids,    1560     position_ids=position_ids,  1561     head_mask=head_mask,    1562     inputs_embeds=inputs_embeds, 1563     output_attentions=output_attentions,    1564     output_hidden_states=output_hidden_states,    1565     return_dict=return_dict,    1566 )    1568 pooled_output = outputs[1]  1570 pooled_output = self.dropout(pooled_output)

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/torch/nn/modules/module.py:1130, in Module._call_impl(self, *input, **kwargs)    1126 # If we don't have any hooks, we want to skip the rest of the logic in    1127 # this function, and just call forward.    1128 if not (self._backward_hooks or self._forward_hooks or self._forward_pre_hooks or _global_backward_hooks    1129         or
_global_forward_hooks or _global_forward_pre_hooks):
-> 1130     return forward_call(*input, **kwargs)    1131 # Do not call functions when jit is used    1132 full_backward_hooks, non_full_backward_hooks = [], []

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/transformers/models/bert/modeling_bert.py:1011, in BertModel.forward(self, input_ids, attention_mask, token_type_ids, position_ids, head_mask, inputs_embeds, encoder_hidden_states, encoder_attention_mask, past_key_values, use_cache, output_attentions, output_hidden_states, return_dict)    1004 # Prepare head mask if needed    1005 # 1.0 in head_mask indicate we keep the head    1006 # attention_probs has shape bsz x n_heads x N x N    1007 # input head_mask has shape [num_heads] or [num_hidden_layers x num_heads]    1008 # and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length]    1009 head_mask = self.get_head_mask(head_mask, self.config.num_hidden_layers)
-> 1011 embedding_output = self.embeddings(    1012     input_ids=input_ids,    1013     position_ids=position_ids,    1014    token_type_ids=token_type_ids,    1015     inputs_embeds=inputs_embeds,    1016     past_key_values_length=past_key_values_length,    1017 )    1018 encoder_outputs = self.encoder(    1019     embedding_output,    1020  attention_mask=extended_attention_mask,    (...)    1028     return_dict=return_dict,    1029 )    1030 sequence_output = encoder_outputs[0]

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/torch/nn/modules/module.py:1130, in Module._call_impl(self, *input, **kwargs)    1126 # If we don't have any hooks, we want to skip the rest of the logic in    1127 # this function, and just call forward.    1128 if not (self._backward_hooks or self._forward_hooks or self._forward_pre_hooks or _global_backward_hooks    1129         or
_global_forward_hooks or _global_forward_pre_hooks):
-> 1130     return forward_call(*input, **kwargs)    1131 # Do not call functions when jit is used    1132 full_backward_hooks, non_full_backward_hooks = [], []

File /nfs/workspaces/virtualenvs/nlpspark/lib/python3.8/site-packages/transformers/models/bert/modeling_bert.py:241, in BertEmbeddings.forward(self, input_ids, token_type_ids, position_ids, inputs_embeds, past_key_values_length)
    239 if self.position_embedding_type == "absolute":
    240     position_embeddings = self.position_embeddings(position_ids)
--> 241     embeddings += position_embeddings
    242 embeddings = self.LayerNorm(embeddings)
    243 embeddings = self.dropout(embeddings)

df_0 is spark dataframe, which include huge data. My question is how to feed this dataframe into the pipeline with entire data, or with batch size.


Solution

  • The error you get (please always post the full error stacktrace in the future), is not caused by the size of a, it is caused by one of the texts exceeding the length your model can handle. Your model can handle up to 512 tokens and you need to truncate your input otherwise:

    from transformers import pipeline
    my_pipeline  = pipeline("text-classification", model="distilbert-base-uncased-finetuned-sst-2-english")
    
    te = "This is a long text "*1024
    print(te)
    print(len(my_pipeline.tokenizer.tokenize(te)))
    my_pipeline(te, truncation=True)
    

    Output:

    This is a long text This is a long text This is a long text This is a long text This is a long text ...
    5120
    [{'label': 'NEGATIVE', 'score': 0.9979830980300903}]
    

    The pipeline object will process a list with one sample at a time. You can try to speed up the classification by specifying a batch_size, however, note that it is not necessarily faster and depends on the model and hardware:

    te_list = [te]*10
    my_pipeline(te_list, batch_size=5, truncation=True,)