tensorflownlphuggingface-transformersnlp-question-answering

How to build a custom question-answering head when using hugginface transformers?


Using the TFBertForQuestionAnswering.from_pretrained() function, we get a predefined head on top of BERT together with a loss function that are suitable for this task.

My question is how to create a custom head without relying on TFAutoModelForQuestionAnswering.from_pretrained().

I want to do this because there is no place where the architecture of the head is explained clearly. By reading the code here we can see the architecture they are using, but I can't be sure I understand their code 100%.

Starting from How to Fine-tune HuggingFace BERT model for Text Classification is good. However, it covers only the classification task, which is much simpler.

'start_positions' and 'end_positions' are created following this tutorial.

So far, I've got the following:

train_dataset
# Dataset({
#     features: ['input_ids', 'token_type_ids', 'attention_mask', 'start_positions', 'end_positions'],
#     num_rows: 99205
# })
train_dataset.set_format(type='tensorflow', columns=['input_ids', 'token_type_ids', 'attention_mask'])
features = {x: train_dataset[x] for x in ['input_ids', 'token_type_ids', 'attention_mask']}
labels = [train_dataset[x] for x in ['start_positions', 'end_positions']]
labels = np.array(labels).T
tfdataset = tf.data.Dataset.from_tensor_slices((features, labels)).batch(16)

input_ids = tf.keras.layers.Input(shape=(256,), dtype=tf.int32, name='input_ids')
token_type_ids = tf.keras.layers.Input(shape=(256,), dtype=tf.int32, name='token_type_ids')
attention_mask = tf.keras.layers.Input((256,), dtype=tf.int32, name='attention_mask')


bert = TFAutoModel.from_pretrained("bert-base-multilingual-cased")
output = bert([input_ids, token_type_ids, attention_mask]).last_hidden_state
output = tf.keras.layers.Dense(2, name="qa_outputs")(output)
model = tf.keras.models.Model(inputs=[input_ids, token_type_ids, attention_mask], outputs=output)


num_train_epochs = 3
num_train_steps = len(tfdataset) * num_train_epochs
optimizer, schedule = create_optimizer(
   init_lr=2e-5,
   num_warmup_steps=0,
   num_train_steps=num_train_steps,
   weight_decay_rate=0.01
)

def qa_loss(labels, logits):
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction=tf.keras.losses.Reduction.NONE
    )
    start_loss = loss_fn(labels[0], logits[0])
    end_loss = loss_fn(labels[1], logits[1])
    return (start_loss + end_loss) / 2.0


model.compile(
    loss=loss_fn,
    optimizer=optimizer
)

model.fit(tfdataset, epochs=num_train_epochs)

And I am getting the following error:

ValueError: `labels.shape` must equal `logits.shape` except for the last dimension. Received: labels.shape=(2,) and logits.shape=(256, 2)

It is complaining about the shape of the labels. This should not happen since I am using SparseCategoricalCrossentropy loss.


Solution

  • For future reference, I actually found a solution, which is just editing the TFBertForQuestionAnswering class itself. For example, I added an additional layer in the following code and trained the model as usual and it worked.

    from transformers import TFBertPreTrainedModel
    from transformers import TFBertMainLayer
    from transformers.modeling_tf_utils import TFQuestionAnsweringLoss, get_initializer, input_processing
    from transformers.modeling_tf_outputs import TFQuestionAnsweringModelOutput 
    from transformers import BertConfig
    
    class MY_TFBertForQuestionAnswering(TFBertPreTrainedModel, TFQuestionAnsweringLoss):
        # names with a '.' represents the authorized unexpected/missing layers when a TF model is loaded from a PT model
        _keys_to_ignore_on_load_unexpected = [
            r"pooler",
            r"mlm___cls",
            r"nsp___cls",
            r"cls.predictions",
            r"cls.seq_relationship",
        ]
    
        def __init__(self, config: BertConfig, *inputs, **kwargs):
            super().__init__(config, *inputs, **kwargs)
    
            self.num_labels = config.num_labels
    
            self.bert = TFBertMainLayer(config, add_pooling_layer=False, name="bert")
    
            # This is the dense layer I added 
            self.my_dense = tf.keras.layers.Dense(
                units=config.hidden_size,
                kernel_initializer=get_initializer(config.initializer_range),
                name="my_dense",
            )
            self.qa_outputs = tf.keras.layers.Dense(
                units=config.num_labels,
                kernel_initializer=get_initializer(config.initializer_range),
                name="qa_outputs",
            )
    
        def call(
            self,
            input_ids = None,
            attention_mask = None,
            token_type_ids = None,
            position_ids = None,
            head_mask = None,
            inputs_embeds = None,
            output_attentions = None,
            output_hidden_states = None,
            return_dict = None,
            start_positions = None,
            end_positions= None,
            training = False,
            **kwargs,
        ):
            r"""
            start_positions (`tf.Tensor` or `np.ndarray` of shape `(batch_size,)`, *optional*):
                Labels for position (index) of the start of the labelled span for computing the token classification loss.
                Positions are clamped to the length of the sequence (`sequence_length`). Position outside of the sequence
                are not taken into account for computing the loss.
            end_positions (`tf.Tensor` or `np.ndarray` of shape `(batch_size,)`, *optional*):
                Labels for position (index) of the end of the labelled span for computing the token classification loss.
                Positions are clamped to the length of the sequence (`sequence_length`). Position outside of the sequence
                are not taken into account for computing the loss.
            """
            inputs = input_processing(
                func=self.call,
                config=self.config,
                input_ids=input_ids,
                attention_mask=attention_mask,
                token_type_ids=token_type_ids,
                position_ids=position_ids,
                head_mask=head_mask,
                inputs_embeds=inputs_embeds,
                output_attentions=output_attentions,
                output_hidden_states=output_hidden_states,
                return_dict=return_dict,
                start_positions=start_positions,
                end_positions=end_positions,
                training=training,
                kwargs_call=kwargs,
            )
            outputs = self.bert(
                input_ids=inputs["input_ids"],
                attention_mask=inputs["attention_mask"],
                token_type_ids=inputs["token_type_ids"],
                position_ids=inputs["position_ids"],
                head_mask=inputs["head_mask"],
                inputs_embeds=inputs["inputs_embeds"],
                output_attentions=inputs["output_attentions"],
                output_hidden_states=inputs["output_hidden_states"],
                return_dict=inputs["return_dict"],
                training=inputs["training"],
            )
            sequence_output = outputs[0]
    
            # You also have to add it here
            my_logits = self.my_dense(inputs=sequence_output)
            logits = self.qa_outputs(inputs=my_logits)
            start_logits, end_logits = tf.split(value=logits, num_or_size_splits=2, axis=-1)
            start_logits = tf.squeeze(input=start_logits, axis=-1)
            end_logits = tf.squeeze(input=end_logits, axis=-1)
            loss = None
    
            if inputs["start_positions"] is not None and inputs["end_positions"] is not None:
                labels = {"start_position": inputs["start_positions"]}
                labels["end_position"] = inputs["end_positions"]
                loss = self.hf_compute_loss(labels=labels, logits=(start_logits, end_logits))
    
            if not inputs["return_dict"]:
                output = (start_logits, end_logits) + outputs[2:]
                return ((loss,) + output) if loss is not None else output
    
            return TFQuestionAnsweringModelOutput(
                loss=loss,
                start_logits=start_logits,
                end_logits=end_logits,
                hidden_states=outputs.hidden_states,
                attentions=outputs.attentions,
            )
    
        def serving_output(self, output: TFQuestionAnsweringModelOutput) -> TFQuestionAnsweringModelOutput:
            hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None
            attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None
    
            return TFQuestionAnsweringModelOutput(
                start_logits=output.start_logits, end_logits=output.end_logits, hidden_states=hs, attentions=attns
            )