pythontensorflowmachine-learningkerasdeep-learning

`TypeError: Unsupported integer size (0)` when attempted to save custom Keras model


I have a subclassed tensorflow.keras.Model Seq2Seq model with custom layers. However when I try to save it via tensorflow.keras.Model.save() method, it throwing the following error:

  File "/home/anaconda3/envs/aizynth-env/lib/python3.10/site-packages/h5py/_hl/dataset.py", line 86, in make_new_dset
    tid = h5t.py_create(dtype, logical=1)
  File "h5py/h5t.pyx", line 1663, in h5py.h5t.py_create
  File "h5py/h5t.pyx", line 1687, in h5py.h5t.py_create
  File "h5py/h5t.pyx", line 1705, in h5py.h5t.py_create
  File "h5py/h5t.pyx", line 1459, in h5py.h5t._c_int
TypeError: Unsupported integer size (0)

Process finished with exit code 1

From what I understand, this issue stems from the HDF5 format trying to serialize a layer or configuration parameter that it doesn't recognise or cannot handle.

My Tensorflow version is 2.17.0

Code for Minimal Reproducible Example

Encoder Layer

import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, Bidirectional, LSTM, Dropout
from typing import Tuple, Optional

class StackedBidirectionalLSTMEncoder(Layer):
    def __init__(self, vocab_size: int, encoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
        super(StackedBidirectionalLSTMEncoder, self).__init__(**kwargs)
        self.units: int = units
        self.embedding: Embedding = Embedding(vocab_size, encoder_embedding_dim, mask_zero=True)
        self.dropout_rate: float = dropout_rate

        self.bidirectional_lstm_1: Bidirectional = Bidirectional(
            LSTM(units, return_sequences=True, return_state=True),
            name='bidirectional_lstm_1'
        )

        self.dropout_1: Dropout = Dropout(dropout_rate, name='encoder_dropout_1')

        self.bidirectional_lstm_2: Bidirectional = Bidirectional(
            LSTM(units, return_sequences=True, return_state=True),
            name='bidirectional_lstm_2'
        )

        self.dropout_2: Dropout = Dropout(dropout_rate, name='encoder_dropout_2')

    def call(self, encoder_input: tf.Tensor, training: Optional[bool] = None):
        # Embed the input and obtain mask
        encoder_output: tf.Tensor = self.embedding(encoder_input)
        mask = self.embedding.compute_mask(encoder_input)

        # Process through encoder layers
        # First LSTM layer
        encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_1(
            encoder_output, mask=mask, training=training
        )
        # Concatenate forward and backward states
        state_h_1: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
        state_c_1: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)

        # Apply dropout
        encoder_output: Optional[tf.Tensor] = self.dropout_1(encoder_output, training=training)

        # Second LSTM layer
        encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_2(
            encoder_output, mask=mask, training=training
        )

        # Concatenate forward and backward states
        state_h_2: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
        state_c_2: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)

        # Apply dropout
        encoder_output: tf.Tensor = self.dropout_2(encoder_output, training=training)

        # Final states
        final_state_h: tf.Tensor = state_h_2
        final_state_c: tf.Tensor = state_c_2

        return encoder_output, final_state_h, final_state_c

    def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
        return self.embedding.compute_mask(inputs, mask)

    def get_config(self) -> dict:
        config = super(StackedBidirectionalLSTMEncoder, self).get_config()
        config.update({
            'vocab_size': self.embedding.input_dim,
            'encoder_embedding_dim': self.embedding.output_dim,
            'units': self.units,
            'dropout_rate': self.dropout_rate,
            'embedding': tf.keras.layers.serialize(self.embedding),
            'bidirectional_lstm_1': tf.keras.layers.serialize(self.bidirectional_lstm_1),
            'dropout_1': tf.keras.layers.serialize(self.dropout_1),
            'bidirectional_lstm_2': tf.keras.layers.serialize(self.bidirectional_lstm_2),
            'dropout_2': tf.keras.layers.serialize(self.dropout_2),
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'StackedBidirectionalLSTMEncoder':
        # Deserialize layers
        config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
        config['bidirectional_lstm_1'] = tf.keras.layers.deserialize(config['bidirectional_lstm_1'])
        config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
        config['bidirectional_lstm_2'] = tf.keras.layers.deserialize(config['bidirectional_lstm_2'])
        config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
        return cls(**config)

Decoder Layer

import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, LSTM, Dropout, Dense
from typing import List, Optional, Tuple, Union, Any


class StackedLSTMDecoder(Layer):
    def __init__(self, vocab_size: int, decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2,
                 **kwargs) -> None:
        super(StackedLSTMDecoder, self).__init__(**kwargs)
        self.units: int = units
        self.embedding: Embedding = Embedding(vocab_size, decoder_embedding_dim, mask_zero=True)
        self.vocab_size: int = vocab_size
        self.dropout_rate: float = dropout_rate

        # Decoder: 4-layer LSTM without internal Dropout
        # Define LSTM and Dropout layers individually
        self.lstm_decoder_1: LSTM = LSTM(
            units,
            return_sequences=True,
            return_state=True,
            name='lstm_decoder_1'
        )
        self.dropout_1: Dropout = Dropout(dropout_rate, name='decoder_dropout_1')

        self.lstm_decoder_2: LSTM = LSTM(
            units,
            return_sequences=True,
            return_state=True,
            name='lstm_decoder_2'
        )
        self.dropout_2: Dropout = Dropout(dropout_rate, name='decoder_dropout_2')

        self.lstm_decoder_3: LSTM = LSTM(
            units,
            return_sequences=True,
            return_state=True,
            name='lstm_decoder_3'
        )
        self.dropout_3: Dropout = Dropout(dropout_rate, name='decoder_dropout_3')

        self.lstm_decoder_4: LSTM = LSTM(
            units,
            return_sequences=True,
            return_state=True,
            name='lstm_decoder_4'
        )
        self.dropout_4: Dropout = Dropout(dropout_rate, name='decoder_dropout_4')

        # Attention Mechanism
        self.attention: BahdanauAttention = BahdanauAttention(units=units)

        # Output layer
        self.dense: Dense = Dense(vocab_size, activation='softmax')

    def call(self, inputs: Tuple[tf.Tensor, List[tf.Tensor], tf.Tensor], training: Optional[bool] = None,
             mask: Optional[tf.Tensor] = None) -> tf.Tensor:
        # Extract initial state and encoder output from inputs
        decoder_input, initial_state, encoder_output = inputs

        if decoder_input is None or initial_state is None or encoder_output is None:
            raise ValueError('decoder_input, initial_state and encoder_output must be provided to the Decoder.')

        # Embed the input and extract decoder mask
        decoder_output: tf.Tensor = self.embedding(decoder_input)
        decoder_mask: Optional[tf.Tensor] = self.embedding.compute_mask(decoder_input)

        # Process through decoder layers
        # First LSTM layer with initial state
        decoder_output, _, _ = self.lstm_decoder_1(
            decoder_output,
            mask=decoder_mask,
            initial_state=initial_state,
            training=training
        )
        decoder_output: tf.Tensor = self.dropout_1(decoder_output, training=training)

        # Second LSTM layer
        decoder_output, _, _ = self.lstm_decoder_2(
            decoder_output,
            mask=decoder_mask,
            training=training
        )
        decoder_output: tf.Tensor = self.dropout_2(decoder_output, training=training)

        # Third LSTM layer
        decoder_output, _, _ = self.lstm_decoder_3(
            decoder_output,
            mask=decoder_mask,
            training=training
        )
        decoder_output: tf.Tensor = self.dropout_3(decoder_output, training=training)

        # Fourth LSTM layer
        decoder_output, final_state_h, final_state_c = self.lstm_decoder_4(
            decoder_output,
            mask=decoder_mask,
            training=training
        )
        decoder_output: tf.Tensor = self.dropout_4(decoder_output, training=training)

        # Extract only the encoder_mask from the mask list
        if mask is not None and isinstance(mask, (list, tuple)):
            encoder_mask = mask[1]
        else:
            encoder_mask = mask

        # Apply attention
        context_vector, attention_weights = self.attention(
            inputs=[encoder_output, decoder_output],
            mask=encoder_mask
        )

        # Concatenate decoder outputs and context vector
        concat_output: tf.Tensor = tf.concat([decoder_output, context_vector], axis=-1)  # (batch_size, seq_len_dec, units + units_enc)

        # Generate outputs
        decoder_output: tf.Tensor = self.dense(concat_output)  # (batch_size, seq_len_dec, vocab_size)

        return decoder_output

    @staticmethod
    def compute_mask(inputs: Any, mask: Optional[Any] = None) -> None:
        return None

    def get_config(self) -> dict:
        config = super(StackedLSTMDecoder, self).get_config()
        config.update({
            'vocab_size': self.vocab_size,
            'decoder_embedding_dim': self.embedding.output_dim,
            'units': self.units,
            'dropout_rate': self.dropout_rate,
            'embedding': tf.keras.layers.serialize(self.embedding),
            'lstm_decoder_1': tf.keras.layers.serialize(self.lstm_decoder_1),
            'dropout_1': tf.keras.layers.serialize(self.dropout_1),
            'lstm_decoder_2': tf.keras.layers.serialize(self.lstm_decoder_2),
            'dropout_2': tf.keras.layers.serialize(self.dropout_2),
            'lstm_decoder_3': tf.keras.layers.serialize(self.lstm_decoder_3),
            'dropout_3': tf.keras.layers.serialize(self.dropout_3),
            'lstm_decoder_4': tf.keras.layers.serialize(self.lstm_decoder_4),
            'dropout_4': tf.keras.layers.serialize(self.dropout_4),
            'attention': tf.keras.layers.serialize(self.attention),
            'dense': tf.keras.layers.serialize(self.dense),
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'StackedLSTMDecoder':
        # Deserialize layers
        config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
        config['lstm_decoder_1'] = tf.keras.layers.deserialize(config['lstm_decoder_1'])
        config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
        config['lstm_decoder_2'] = tf.keras.layers.deserialize(config['lstm_decoder_2'])
        config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
        config['lstm_decoder_3'] = tf.keras.layers.deserialize(config['lstm_decoder_3'])
        config['dropout_3'] = tf.keras.layers.deserialize(config['dropout_3'])
        config['lstm_decoder_4'] = tf.keras.layers.deserialize(config['lstm_decoder_4'])
        config['dropout_4'] = tf.keras.layers.deserialize(config['dropout_4'])
        config['attention'] = tf.keras.layers.deserialize(config['attention'])
        config['dense'] = tf.keras.layers.deserialize(config['dense'])
        return cls(**config)

Attention Layer

import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense
from attention.attention_interface import AttentionInterface
from typing import List, Optional, Tuple, Union


class BahdanauAttention(Layer):
    def __init__(self, units: int, **kwargs):
        super(BahdanauAttention, self).__init__(**kwargs)
        self.units: int = units
        self.attention_dense1: Dense = Dense(units, name='attention_dense1')
        self.attention_dense2: Dense = Dense(units, name='attention_dense2')
        self.attention_v: Dense = Dense(1, name='attention_v')
        self.supports_masking: bool = True

    def call(self, inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None,
             training: Union[None, bool] = None) -> Tuple[tf.Tensor, tf.Tensor]:
        # Unpack inputs
        encoder_output, decoder_output = inputs

        # Attention Mechanism
        # Calculate attention scores
        # Expand dimensions to match the shapes for broadcasting
        encoder_output_expanded: tf.Tensor = tf.expand_dims(encoder_output,
                                                 1)  # Shape: (batch_size, 1, seq_len_encoder, units*2)
        decoder_output_expanded: tf.Tensor = tf.expand_dims(decoder_output,
                                                 2)  # Shape: (batch_size, seq_len_decoder, 1, units)

        # Compute the attention scores
        score: tf.Tensor = tf.nn.tanh(
            self.attention_dense1(encoder_output_expanded) + self.attention_dense2(decoder_output_expanded)
        )  # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units)

        # Apply mask if available
        if mask is not None:
            # If mask is a list or tuple, both encoder and decoder mask have been passed.
            # Extract the encoder mask
            if isinstance(mask, (list, tuple)):
                encoder_mask: tf.Tensor = mask[0]
            else:
                encoder_mask = mask
            if encoder_mask is not None:
                # mask shape: (batch_size, seq_len_encoder)
                # Expand mask to match score dimensions
                encoder_mask = tf.cast(tf.expand_dims(encoder_mask, 1), dtype=score.dtype)  # (batch_size, 1, seq_len_encoder)
                encoder_mask = tf.expand_dims(encoder_mask, -1)  # (batch_size, 1, seq_len_encoder, 1)
                # Add a large negative value to masked positions to nullify their effect after softmax
                score += (1.0 - encoder_mask) * -1e9

        attention_weights: tf.Tensor = tf.nn.softmax(self.attention_v(score),
                                          axis=2)  # Shape: (batch_size, seq_len_decoder, seq_len_encoder, 1)

        # Compute the context vector
        context_vector: tf.Tensor = attention_weights * encoder_output_expanded  # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units*2)
        context_vector: tf.Tensor = tf.reduce_sum(context_vector, axis=2)  # Shape: (batch_size, seq_len_decoder, units*2)

        return context_vector, attention_weights

    @staticmethod
    def compute_mask(inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None) -> None:
        # This layer does not propagate the mask further
        return None

    def get_config(self) -> dict:
        config = super(BahdanauAttention, self).get_config()
        config.update({
            'units': self.units,
            'attention_dense1': tf.keras.layers.serialize(self.attention_dense1),
            'attention_dense2': tf.keras.layers.serialize(self.attention_dense2),
            'attention_v': tf.keras.layers.serialize(self.attention_v),
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'BahdanauAttention':
        # Deserialize layers
        config['attention_dense1'] = tf.keras.layers.deserialize(config['attention_dense1'])
        config['attention_dense2'] = tf.keras.layers.deserialize(config['attention_dense2'])
        config['attention_v'] = tf.keras.layers.deserialize(config['attention_v'])
        return cls(**config)

Seq2Seq Model

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.train import Checkpoint, CheckpointManager
from tensorflow.keras.callbacks import Callback
from typing import Optional, Any, Tuple

class RetrosynthesisSeq2SeqModel(Model):
    def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
                 decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
        super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)

        # Save the number of units (neurons)
        self.units: int = units

        # Encoder layer
        self.encoder: StackedBidirectionalLSTMEncoder = StackedBidirectionalLSTMEncoder(
            input_vocab_size, encoder_embedding_dim, units, dropout_rate
        )

        # Decoder layer
        self.decoder: StackedLSTMDecoder = StackedLSTMDecoder(
            output_vocab_size, decoder_embedding_dim, units, dropout_rate
        )

        # Save the vocabulary sizes
        self.input_vocab_size: int = input_vocab_size
        self.output_vocab_size: int = output_vocab_size

        # Mapping encoder final states to decoder initial states
        self.enc_state_h: Dense = Dense(units, name='enc_state_h')
        self.enc_state_c: Dense = Dense(units, name='enc_state_c')

        # Store the data processors (to be set externally)
        self.encoder_data_processor: Optional[Any] = None
        self.decoder_data_processor: Optional[Any] = None

        # Save the dropout rate
        self.dropout_rate: float = dropout_rate

    def build(self, input_shape):
        # Define the input shapes for encoder and decoder
        encoder_input_shape, decoder_input_shape = input_shape

        # Pass a dummy input through encoder and decoder to initialize weights
        encoder_dummy = tf.zeros(encoder_input_shape)
        decoder_dummy = tf.zeros(decoder_input_shape)

        # Forward pass to build the model
        self.call((encoder_dummy, decoder_dummy), training=False)

        # Mark the model as built
        super(RetrosynthesisSeq2SeqModel, self).build(input_shape)

    def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
        """
        Forward pass of the Seq2Seq model.

        Args:
            inputs (Tuple[tf.Tensor, tf.Tensor]): Tuple containing encoder and decoder inputs.
            training (Optional[bool], optional): Training flag. Defaults to None.

        Returns:
            tf.Tensor: The output predictions from the decoder.
        """
        # Extract encoder and decoder inputs
        encoder_input, decoder_input = inputs

        # Encoder
        encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)

        # Map encoder final states to decoder initial states
        decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h)  # (batch_size, units)
        decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c)  # (batch_size, units)
        decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)

        # Prepare decoder inputs as a tuple
        decoder_inputs: Tuple[tf.Tensor, Tuple[tf.Tensor, tf.Tensor], tf.Tensor] = (
            decoder_input,
            decoder_initial_state,
            encoder_output
        )

        # Extract encoder mask
        encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)

        # Decoder
        output: tf.Tensor = self.decoder.call(
            decoder_inputs,
            training=training,
            mask=encoder_mask
        )

        return output

    def get_config(self) -> dict:
        config = super(RetrosynthesisSeq2SeqModel, self).get_config()
        config.update({
            'units': self.units,
            'input_vocab_size': self.input_vocab_size,
            'output_vocab_size': self.output_vocab_size,
            'encoder_embedding_dim': self.encoder.embedding.output_dim,
            'decoder_embedding_dim': self.decoder.embedding.output_dim,
            'dropout_rate': self.dropout_rate,
            'encoder': tf.keras.layers.serialize(self.encoder),
            'decoder': tf.keras.layers.serialize(self.decoder),
            'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
            'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
        # Deserialize layers
        config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
        config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
        config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
        config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
        return cls(**config)

Minimal Reproducible Example Script

#!/usr/bin/env python3

import numpy as np
from tensorflow.keras.optimizers import Adam

input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2

model = RetrosynthesisSeq2SeqModel(
    input_vocab_size=input_vocab_size,
    output_vocab_size=output_vocab_size,
    encoder_embedding_dim=encoder_embedding_dim,
    decoder_embedding_dim=decoder_embedding_dim,
    units=units,
    dropout_rate=dropout_rate
)

encoder_input_shape = (1, 20)  # (batch_size, sequence_length)
decoder_input_shape = (1, 20)  # (batch_size, sequence_length)

model.build([encoder_input_shape, decoder_input_shape])

sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))

learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)

model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)

model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")

Solution

  • I had this issue too, but I wasn't using custom layers. What solved it for me was creating a new conda environment and downgrading to TensorFlow 2.12.0 and h5py 3.6.0.

    conda create -n tf_env python=3.10
    conda activate tf_env
    conda install h5py=3.6.0
    conda install tensorflow[and-cuda]
    

    You can specify the TensorFlow version too, but mine defaulted to downloading 2.12.0.

    Hope this helps!