I have a subclassed tensorflow.keras.Model
Seq2Seq model with custom layers. However, when I try to run a test script to build and compile the model, running model.summary()
gives:
Model: "retrosynthesis_seq2_seq_model"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Layer (type) ┃ Output Shape ┃ Param # ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ simple_encoder (SimpleEncoder) │ ? │ 0 (unbuilt) │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ simple_decoder (SimpleDecoder) │ ? │ 0 (unbuilt) │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ enc_state_h (Dense) │ (1, 128) │ 16,512 │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ enc_state_c (Dense) │ (1, 128) │ 16,512 │
└─────────────────────────────────┴────────────────────────┴───────────────┘
Total params: 361,064 (1.38 MB)
Trainable params: 361,064 (1.38 MB)
Non-trainable params: 0 (0.00 B)
Model output shape: (1, 20, 1000)
From what I can tell, I have correctly implemented the build()
methods for the encoder & decoder layers. I think this is then causing a TypeError: Unsupported integer size (0)
serialization error when I attempt to save the model.
I have included the encoder, decoder and Seq2Seq model classes below, along with the test script for replication. I appreciate its quite a lot of code, but pasting it into a single file and running is enough to replicate the error:
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, Layer, Embedding, Bidirectional, LSTM, Dropout
from tensorflow.keras.optimizers import Adam
from typing import Optional, Tuple, Any
"""
Encoder Layer
"""
class SimpleEncoder(Layer):
def __init__(self, vocab_size: int, embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
super(SimpleEncoder, self).__init__(**kwargs)
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.units = units
self.dropout_rate = dropout_rate
self.embedding = Embedding(input_dim=vocab_size, output_dim=embedding_dim, mask_zero=True, name='simple_embedding')
self.dense = Dense(units, activation='relu', name='simple_dense')
self.dropout = Dropout(dropout_rate, name='simple_dropout')
def build(self, input_shape):
self.embedding.build(input_shape)
embedding_output_shape = self.embedding.compute_output_shape(input_shape)
self.dense.build(embedding_output_shape)
dense_output_shape = self.dense.compute_output_shape(embedding_output_shape)
self.dropout.build(dense_output_shape)
super(SimpleEncoder, self).build(input_shape)
def call(self, inputs: tf.Tensor, training: Optional[bool] = None) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
x = self.embedding(inputs) # Shape: (batch_size, sequence_length, embedding_dim)
encoder_output = self.dense(x) # Shape: (batch_size, sequence_length, units)
encoder_output = self.dropout(encoder_output, training=training)
state_h = tf.zeros_like(encoder_output[:, 0, :]) # Shape: (batch_size, units)
state_c = tf.zeros_like(encoder_output[:, 0, :]) # Shape: (batch_size, units)
return encoder_output, state_h, state_c
def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
return self.embedding.compute_mask(inputs, mask)
def get_config(self) -> dict:
config = super(SimpleEncoder, self).get_config()
config.update({
'vocab_size': self.vocab_size,
'embedding_dim': self.embedding_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'dense': tf.keras.layers.serialize(self.dense),
'dropout': tf.keras.layers.serialize(self.dropout),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'SimpleEncoder':
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['dense'] = tf.keras.layers.deserialize(config['dense'])
config['dropout'] = tf.keras.layers.deserialize(config['dropout'])
return cls(**config)
"""
Decoder Layer
"""
class SimpleDecoder(Layer):
def __init__(
self,
vocab_size: int,
embedding_dim: int,
units: int,
dropout_rate: float = 0.2,
**kwargs
):
super(SimpleDecoder, self).__init__(**kwargs)
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.units = units
self.dropout_rate = dropout_rate
self.embedding = Embedding(
input_dim=vocab_size,
output_dim=embedding_dim,
mask_zero=True,
name='decoder_embedding'
)
self.lstm = LSTM(
units,
return_sequences=True,
return_state=True,
name='decoder_lstm'
)
self.dropout = Dropout(dropout_rate, name='decoder_dropout')
self.dense = Dense(vocab_size, activation='softmax', name='decoder_dense')
def build(self, input_shape):
decoder_input_shape, initial_states_shape = input_shape
self.embedding.build(decoder_input_shape)
embedding_output_shape = self.embedding.compute_output_shape(decoder_input_shape)
self.lstm.build(embedding_output_shape)
lstm_output_shape = self.lstm.compute_output_shape(embedding_output_shape)
self.dropout.build(lstm_output_shape)
dropout_output_shape = self.dropout.compute_output_shape(lstm_output_shape)
self.dense.build(dropout_output_shape)
super(SimpleDecoder, self).build(input_shape)
def call(
self,
inputs: Tuple[tf.Tensor, tuple[tf.Tensor, tf.Tensor]],
training: Optional[bool] = None,
mask: Optional[tf.Tensor] = None
) -> tf.Tensor:
decoder_input, initial_state = inputs
if decoder_input is None or initial_state is None:
raise ValueError('decoder_input and initial_state must be provided to the Decoder.')
x = self.embedding(decoder_input)
lstm_output, state_h, state_c = self.lstm(
x,
initial_state=initial_state,
training=training,
mask=None
)
lstm_output = self.dropout(lstm_output, training=training)
output = self.dense(lstm_output)
return output
@staticmethod
def compute_mask(inputs: Tuple, mask: Optional[tf.Tensor] = None) -> None:
return None
def get_config(self) -> dict:
config = super(SimpleDecoder, self).get_config()
config.update({
'vocab_size': self.vocab_size,
'embedding_dim': self.embedding_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'lstm': tf.keras.layers.serialize(self.lstm),
'dropout': tf.keras.layers.serialize(self.dropout),
'dense': tf.keras.layers.serialize(self.dense),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'SimpleDecoder':
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['lstm'] = tf.keras.layers.deserialize(config['lstm'])
config['dropout'] = tf.keras.layers.deserialize(config['dropout'])
config['dense'] = tf.keras.layers.deserialize(config['dense'])
return cls(**config)
"""
Seq2Seq Model
"""
class RetrosynthesisSeq2SeqModel(Model):
def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)
self.units: int = units
self.encoder: SimpleEncoder = SimpleEncoder(
input_vocab_size, encoder_embedding_dim, units, dropout_rate
)
self.decoder: SimpleDecoder = SimpleDecoder(
output_vocab_size, decoder_embedding_dim, units, dropout_rate
)
self.input_vocab_size: int = input_vocab_size
self.output_vocab_size: int = output_vocab_size
self.enc_state_h: Dense = Dense(units, name='enc_state_h')
self.enc_state_c: Dense = Dense(units, name='enc_state_c')
self.encoder_data_processor: Optional[Any] = None
self.decoder_data_processor: Optional[Any] = None
self.dropout_rate: float = dropout_rate
def build(self, input_shape):
encoder_input_shape, decoder_input_shape = input_shape
encoder_dummy = tf.zeros(encoder_input_shape)
decoder_dummy = tf.zeros(decoder_input_shape)
self.call((encoder_dummy, decoder_dummy), training=False)
super(RetrosynthesisSeq2SeqModel, self).build(input_shape)
def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
encoder_input, decoder_input = inputs
encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)
decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h)
decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c)
decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)
decoder_inputs = (
decoder_input,
decoder_initial_state
)
encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)
output: tf.Tensor = self.decoder.call(
decoder_inputs,
training=training,
mask=encoder_mask
)
return output
def get_config(self) -> dict:
config = super(RetrosynthesisSeq2SeqModel, self).get_config()
config.update({
'units': self.units,
'input_vocab_size': self.input_vocab_size,
'output_vocab_size': self.output_vocab_size,
'encoder_embedding_dim': self.encoder.embedding.output_dim,
'decoder_embedding_dim': self.decoder.embedding.output_dim,
'dropout_rate': self.dropout_rate,
'encoder': tf.keras.layers.serialize(self.encoder),
'decoder': tf.keras.layers.serialize(self.decoder),
'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
})
return config
@classmethod
def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
return cls(**config)
"""
Test Script
"""
input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2
model = RetrosynthesisSeq2SeqModel(
input_vocab_size=input_vocab_size,
output_vocab_size=output_vocab_size,
encoder_embedding_dim=encoder_embedding_dim,
decoder_embedding_dim=decoder_embedding_dim,
units=units,
dropout_rate=dropout_rate
)
encoder_seq_length = 20
decoder_seq_length = 20
model.build(input_shape=[(1, encoder_seq_length), (1, decoder_seq_length)])
sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))
learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()
output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)
model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")
Encoder and decoder were unbuilt as I was invoking the call()
method directly:
encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)
output: tf.Tensor = self.decoder(
decoder_inputs,
training=training,
mask=encoder_mask
)
After changing this to invoke the layers directly, the encoder and decoder layers are showing as built in model.summary()
output:
encoder_output, state_h, state_c = self.encoder(encoder_input, training=training)
output: tf.Tensor = self.decoder(
decoder_inputs,
training=training,
mask=encoder_mask
)
From what I can tell, invoking the layer's call()
method directly bypasses Keras's internal mechanisms for tracking and building layers, resulting in them not being properly built and tracked.
However the TypeError: Unsupported integer size (0)
error when attempting to save the model in .keras
format via model.save()
persists. Must be separate serialization issue in my model.