I have a subclassed tensorflow.keras.Model
Seq2Seq model with custom layers. However when I try to save it via tensorflow.keras.Model.save()
method, it throwing the following error:
File "/home/anaconda3/envs/aizynth-env/lib/python3.10/site-packages/h5py/_hl/dataset.py", line 86, in make_new_dset
tid = h5t.py_create(dtype, logical=1)
File "h5py/h5t.pyx", line 1663, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1687, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1705, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1459, in h5py.h5t._c_int
TypeError: Unsupported integer size (0)
Process finished with exit code 1
From what I understand, this issue stems from the HDF5 format trying to serialize a layer or configuration parameter that it doesn't recognise or cannot handle.
My Tensorflow version is 2.17.0
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, Bidirectional, LSTM, Dropout
from typing import Tuple, Optional
class StackedBidirectionalLSTMEncoder(Layer):
def __init__(self, vocab_size: int, encoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
super(StackedBidirectionalLSTMEncoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, encoder_embedding_dim, mask_zero=True)
self.dropout_rate: float = dropout_rate
self.bidirectional_lstm_1: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_1'
)
self.dropout_1: Dropout = Dropout(dropout_rate, name='encoder_dropout_1')
self.bidirectional_lstm_2: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_2'
)
self.dropout_2: Dropout = Dropout(dropout_rate, name='encoder_dropout_2')
def call(self, encoder_input: tf.Tensor, training: Optional[bool] = None):
# Embed the input and obtain mask
encoder_output: tf.Tensor = self.embedding(encoder_input)
mask = self.embedding.compute_mask(encoder_input)
# Process through encoder layers
# First LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_1(
encoder_output, mask=mask, training=training
)
# Concatenate forward and backward states
state_h_1: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_1: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)
# Apply dropout
encoder_output: Optional[tf.Tensor] = self.dropout_1(encoder_output, training=training)
# Second LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_2(
encoder_output, mask=mask, training=training
)
# Concatenate forward and backward states
state_h_2: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_2: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)
# Apply dropout
encoder_output: tf.Tensor = self.dropout_2(encoder_output, training=training)
# Final states
final_state_h: tf.Tensor = state_h_2
final_state_c: tf.Tensor = state_c_2
return encoder_output, final_state_h, final_state_c
def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
return self.embedding.compute_mask(inputs, mask)
def get_config(self) -> dict:
config = super(StackedBidirectionalLSTMEncoder, self).get_config()
config.update({
'vocab_size': self.embedding.input_dim,
'encoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'bidirectional_lstm_1': tf.keras.layers.serialize(self.bidirectional_lstm_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'bidirectional_lstm_2': tf.keras.layers.serialize(self.bidirectional_lstm_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'StackedBidirectionalLSTMEncoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['bidirectional_lstm_1'] = tf.keras.layers.deserialize(config['bidirectional_lstm_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['bidirectional_lstm_2'] = tf.keras.layers.deserialize(config['bidirectional_lstm_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
return cls(**config)
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, LSTM, Dropout, Dense
from typing import List, Optional, Tuple, Union, Any
class StackedLSTMDecoder(Layer):
def __init__(self, vocab_size: int, decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2,
**kwargs) -> None:
super(StackedLSTMDecoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, decoder_embedding_dim, mask_zero=True)
self.vocab_size: int = vocab_size
self.dropout_rate: float = dropout_rate
# Decoder: 4-layer LSTM without internal Dropout
# Define LSTM and Dropout layers individually
self.lstm_decoder_1: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_1'
)
self.dropout_1: Dropout = Dropout(dropout_rate, name='decoder_dropout_1')
self.lstm_decoder_2: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_2'
)
self.dropout_2: Dropout = Dropout(dropout_rate, name='decoder_dropout_2')
self.lstm_decoder_3: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_3'
)
self.dropout_3: Dropout = Dropout(dropout_rate, name='decoder_dropout_3')
self.lstm_decoder_4: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_4'
)
self.dropout_4: Dropout = Dropout(dropout_rate, name='decoder_dropout_4')
# Attention Mechanism
self.attention: BahdanauAttention = BahdanauAttention(units=units)
# Output layer
self.dense: Dense = Dense(vocab_size, activation='softmax')
def call(self, inputs: Tuple[tf.Tensor, List[tf.Tensor], tf.Tensor], training: Optional[bool] = None,
mask: Optional[tf.Tensor] = None) -> tf.Tensor:
# Extract initial state and encoder output from inputs
decoder_input, initial_state, encoder_output = inputs
if decoder_input is None or initial_state is None or encoder_output is None:
raise ValueError('decoder_input, initial_state and encoder_output must be provided to the Decoder.')
# Embed the input and extract decoder mask
decoder_output: tf.Tensor = self.embedding(decoder_input)
decoder_mask: Optional[tf.Tensor] = self.embedding.compute_mask(decoder_input)
# Process through decoder layers
# First LSTM layer with initial state
decoder_output, _, _ = self.lstm_decoder_1(
decoder_output,
mask=decoder_mask,
initial_state=initial_state,
training=training
)
decoder_output: tf.Tensor = self.dropout_1(decoder_output, training=training)
# Second LSTM layer
decoder_output, _, _ = self.lstm_decoder_2(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_2(decoder_output, training=training)
# Third LSTM layer
decoder_output, _, _ = self.lstm_decoder_3(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_3(decoder_output, training=training)
# Fourth LSTM layer
decoder_output, final_state_h, final_state_c = self.lstm_decoder_4(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_4(decoder_output, training=training)
# Extract only the encoder_mask from the mask list
if mask is not None and isinstance(mask, (list, tuple)):
encoder_mask = mask[1]
else:
encoder_mask = mask
# Apply attention
context_vector, attention_weights = self.attention(
inputs=[encoder_output, decoder_output],
mask=encoder_mask
)
# Concatenate decoder outputs and context vector
concat_output: tf.Tensor = tf.concat([decoder_output, context_vector], axis=-1) # (batch_size, seq_len_dec, units + units_enc)
# Generate outputs
decoder_output: tf.Tensor = self.dense(concat_output) # (batch_size, seq_len_dec, vocab_size)
return decoder_output
@staticmethod
def compute_mask(inputs: Any, mask: Optional[Any] = None) -> None:
return None
def get_config(self) -> dict:
config = super(StackedLSTMDecoder, self).get_config()
config.update({
'vocab_size': self.vocab_size,
'decoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'lstm_decoder_1': tf.keras.layers.serialize(self.lstm_decoder_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'lstm_decoder_2': tf.keras.layers.serialize(self.lstm_decoder_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
'lstm_decoder_3': tf.keras.layers.serialize(self.lstm_decoder_3),
'dropout_3': tf.keras.layers.serialize(self.dropout_3),
'lstm_decoder_4': tf.keras.layers.serialize(self.lstm_decoder_4),
'dropout_4': tf.keras.layers.serialize(self.dropout_4),
'attention': tf.keras.layers.serialize(self.attention),
'dense': tf.keras.layers.serialize(self.dense),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'StackedLSTMDecoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['lstm_decoder_1'] = tf.keras.layers.deserialize(config['lstm_decoder_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['lstm_decoder_2'] = tf.keras.layers.deserialize(config['lstm_decoder_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
config['lstm_decoder_3'] = tf.keras.layers.deserialize(config['lstm_decoder_3'])
config['dropout_3'] = tf.keras.layers.deserialize(config['dropout_3'])
config['lstm_decoder_4'] = tf.keras.layers.deserialize(config['lstm_decoder_4'])
config['dropout_4'] = tf.keras.layers.deserialize(config['dropout_4'])
config['attention'] = tf.keras.layers.deserialize(config['attention'])
config['dense'] = tf.keras.layers.deserialize(config['dense'])
return cls(**config)
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense
from attention.attention_interface import AttentionInterface
from typing import List, Optional, Tuple, Union
class BahdanauAttention(Layer):
def __init__(self, units: int, **kwargs):
super(BahdanauAttention, self).__init__(**kwargs)
self.units: int = units
self.attention_dense1: Dense = Dense(units, name='attention_dense1')
self.attention_dense2: Dense = Dense(units, name='attention_dense2')
self.attention_v: Dense = Dense(1, name='attention_v')
self.supports_masking: bool = True
def call(self, inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None,
training: Union[None, bool] = None) -> Tuple[tf.Tensor, tf.Tensor]:
# Unpack inputs
encoder_output, decoder_output = inputs
# Attention Mechanism
# Calculate attention scores
# Expand dimensions to match the shapes for broadcasting
encoder_output_expanded: tf.Tensor = tf.expand_dims(encoder_output,
1) # Shape: (batch_size, 1, seq_len_encoder, units*2)
decoder_output_expanded: tf.Tensor = tf.expand_dims(decoder_output,
2) # Shape: (batch_size, seq_len_decoder, 1, units)
# Compute the attention scores
score: tf.Tensor = tf.nn.tanh(
self.attention_dense1(encoder_output_expanded) + self.attention_dense2(decoder_output_expanded)
) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units)
# Apply mask if available
if mask is not None:
# If mask is a list or tuple, both encoder and decoder mask have been passed.
# Extract the encoder mask
if isinstance(mask, (list, tuple)):
encoder_mask: tf.Tensor = mask[0]
else:
encoder_mask = mask
if encoder_mask is not None:
# mask shape: (batch_size, seq_len_encoder)
# Expand mask to match score dimensions
encoder_mask = tf.cast(tf.expand_dims(encoder_mask, 1), dtype=score.dtype) # (batch_size, 1, seq_len_encoder)
encoder_mask = tf.expand_dims(encoder_mask, -1) # (batch_size, 1, seq_len_encoder, 1)
# Add a large negative value to masked positions to nullify their effect after softmax
score += (1.0 - encoder_mask) * -1e9
attention_weights: tf.Tensor = tf.nn.softmax(self.attention_v(score),
axis=2) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, 1)
# Compute the context vector
context_vector: tf.Tensor = attention_weights * encoder_output_expanded # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units*2)
context_vector: tf.Tensor = tf.reduce_sum(context_vector, axis=2) # Shape: (batch_size, seq_len_decoder, units*2)
return context_vector, attention_weights
@staticmethod
def compute_mask(inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None) -> None:
# This layer does not propagate the mask further
return None
def get_config(self) -> dict:
config = super(BahdanauAttention, self).get_config()
config.update({
'units': self.units,
'attention_dense1': tf.keras.layers.serialize(self.attention_dense1),
'attention_dense2': tf.keras.layers.serialize(self.attention_dense2),
'attention_v': tf.keras.layers.serialize(self.attention_v),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'BahdanauAttention':
# Deserialize layers
config['attention_dense1'] = tf.keras.layers.deserialize(config['attention_dense1'])
config['attention_dense2'] = tf.keras.layers.deserialize(config['attention_dense2'])
config['attention_v'] = tf.keras.layers.deserialize(config['attention_v'])
return cls(**config)
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.train import Checkpoint, CheckpointManager
from tensorflow.keras.callbacks import Callback
from typing import Optional, Any, Tuple
class RetrosynthesisSeq2SeqModel(Model):
def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)
# Save the number of units (neurons)
self.units: int = units
# Encoder layer
self.encoder: StackedBidirectionalLSTMEncoder = StackedBidirectionalLSTMEncoder(
input_vocab_size, encoder_embedding_dim, units, dropout_rate
)
# Decoder layer
self.decoder: StackedLSTMDecoder = StackedLSTMDecoder(
output_vocab_size, decoder_embedding_dim, units, dropout_rate
)
# Save the vocabulary sizes
self.input_vocab_size: int = input_vocab_size
self.output_vocab_size: int = output_vocab_size
# Mapping encoder final states to decoder initial states
self.enc_state_h: Dense = Dense(units, name='enc_state_h')
self.enc_state_c: Dense = Dense(units, name='enc_state_c')
# Store the data processors (to be set externally)
self.encoder_data_processor: Optional[Any] = None
self.decoder_data_processor: Optional[Any] = None
# Save the dropout rate
self.dropout_rate: float = dropout_rate
def build(self, input_shape):
# Define the input shapes for encoder and decoder
encoder_input_shape, decoder_input_shape = input_shape
# Pass a dummy input through encoder and decoder to initialize weights
encoder_dummy = tf.zeros(encoder_input_shape)
decoder_dummy = tf.zeros(decoder_input_shape)
# Forward pass to build the model
self.call((encoder_dummy, decoder_dummy), training=False)
# Mark the model as built
super(RetrosynthesisSeq2SeqModel, self).build(input_shape)
def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
"""
Forward pass of the Seq2Seq model.
Args:
inputs (Tuple[tf.Tensor, tf.Tensor]): Tuple containing encoder and decoder inputs.
training (Optional[bool], optional): Training flag. Defaults to None.
Returns:
tf.Tensor: The output predictions from the decoder.
"""
# Extract encoder and decoder inputs
encoder_input, decoder_input = inputs
# Encoder
encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)
# Map encoder final states to decoder initial states
decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h) # (batch_size, units)
decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c) # (batch_size, units)
decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)
# Prepare decoder inputs as a tuple
decoder_inputs: Tuple[tf.Tensor, Tuple[tf.Tensor, tf.Tensor], tf.Tensor] = (
decoder_input,
decoder_initial_state,
encoder_output
)
# Extract encoder mask
encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)
# Decoder
output: tf.Tensor = self.decoder.call(
decoder_inputs,
training=training,
mask=encoder_mask
)
return output
def get_config(self) -> dict:
config = super(RetrosynthesisSeq2SeqModel, self).get_config()
config.update({
'units': self.units,
'input_vocab_size': self.input_vocab_size,
'output_vocab_size': self.output_vocab_size,
'encoder_embedding_dim': self.encoder.embedding.output_dim,
'decoder_embedding_dim': self.decoder.embedding.output_dim,
'dropout_rate': self.dropout_rate,
'encoder': tf.keras.layers.serialize(self.encoder),
'decoder': tf.keras.layers.serialize(self.decoder),
'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
})
return config
@classmethod
def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
# Deserialize layers
config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
return cls(**config)
#!/usr/bin/env python3
import numpy as np
from tensorflow.keras.optimizers import Adam
input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2
model = RetrosynthesisSeq2SeqModel(
input_vocab_size=input_vocab_size,
output_vocab_size=output_vocab_size,
encoder_embedding_dim=encoder_embedding_dim,
decoder_embedding_dim=decoder_embedding_dim,
units=units,
dropout_rate=dropout_rate
)
encoder_input_shape = (1, 20) # (batch_size, sequence_length)
decoder_input_shape = (1, 20) # (batch_size, sequence_length)
model.build([encoder_input_shape, decoder_input_shape])
sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))
learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)
model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")
I had this issue too, but I wasn't using custom layers. What solved it for me was creating a new conda environment and downgrading to TensorFlow 2.12.0 and h5py 3.6.0.
conda create -n tf_env python=3.10
conda activate tf_env
conda install h5py=3.6.0
conda install tensorflow[and-cuda]
You can specify the TensorFlow version too, but mine defaulted to downloading 2.12.0.
Hope this helps!