tensorflowkerastensorflow-datasetsmulti-gpu

MirroredStrategy causes IndexError: pop from empty list when using Keras Sequences as model input


While the MirroredStrategy's IndexError: pop from empty list is now infamous and there are numerous possible causes for it, such as reported in the following questions:

And so forth, but none apply to my use case.

In my use case, I'm using Keras Sequence objects to generate the training inputs, as I'm working on large datasets (would not fit in RAM) with a single known positive class and unknown negatives.

Following tutorials such as the one available on the Keras Documentation and TensorFlow documentation my code looks like the following:


my_training_sequence = MySequenceObject()

if tf.config.list_physical_devices('GPU'):
    strategy = tf.distribute.MirroredStrategy(devices)
else:
    # Use the Default Strategy
    strategy = tf.distribute.get_strategy()

with strategy.scope():
    model = CreateMyKerasModel()
    # While in the TensorFlow documentation the compilation step
    # is shown OUTSIDE the scope, in the Keras one it happens
    # within the scope.
    # I  have found out that is NECESSARY to place it inside the scope
    # as the Keras Metrics need to be in the same strategy scope of the model
    # to work properly.
    model.compile(...)

# Then, OUSIDE from the score, run the fit
# which causes the IndexError
model.fit(my_training_sequence)

Any ideas on how to deal with this?


Solution

  • After much pain, I realized that in the Keras Documentation they make use of TensorFlow Dataset objects.

    Now, normal inputs such as vectors, are converted to Datasets within the fit process and do not cause problems for this reason, but currently Keras does not support the automatic conversion of Keras Sequences into Datasets under the hood. While I do not know why this is, fortunately it is relatively easy to create a method to convert a Sequence into a Dataset.

    Unfortunately, it is dependant on the version of TensorFlow you are using, so in certain versions you want to use TensorSpec objects, while in older one just the combination of tensorflow data types and TensorShape will do.

    In the following example, I will show an high level approach to writing a Keras Sequence class that can be converted to a Dataset. Afterwards, I will link to all Keras Sequences I have already implemented in this fashion as examples for the posterity (or myself, once I forget some of the details of this devilish thing).

    import tensorflow as tf
    import numpy as np
    from packaging import version
    from validate_version_code import validate_version_code
    
    
    def tensorflow_version_is_higher_or_equal_than(tensorflow_version: str) -> bool:
        """Returns boolean if the TensorFlow version is higher than provided one.
    
        Parameters
        ----------------------
        tensorflow_version: str,
            The version of TensorFlow to check against.
    
        Raises
        ----------------------
        ValueError,
            If the provided version code is not a valid one.
    
        Returns
        ----------------------
        Boolean representing if installed TensorFlow version is higher than given one.
        """
        if not validate_version_code(tensorflow_version):
            raise ValueError(
                (
                    "The provided TensorFlow version code `{}` "
                    "is not a valid version code."
                ).format(tensorflow_version)
            )
        return version.parse(tf.__version__) >= version.parse(tensorflow_version)
    
    
    class ExampleSequence:
        """Keras Sequence convertible into a TensorFlow Dataset."""
    
        def __init__(
            self,
            batch_size: int = 32,
            batches_per_epoch: int,
            # Your other parameters go here
        ):
            """
    
            Parameters
            --------------------------------
            batch_size: int = 32
                Size for the batches to generate,
                if the size is expected to be CONSTANT
                otherwise use None if some batches have different size
            batches_per_epoch: int
                The number of batches within an epoch
            """
            self._batch_size = batch_size
            self._batches_per_epoch = batches_per_epoch
            # Initialize the index of the batch for the Dataset calls
            self._current_index = 0
            # Your other parameters go here
    
        def __call__(self):
            """Return next batch using an infinite generator model."""
            self._current_index = (self._current_index + 1) % self._batches_per_epoch
            return self[self._current_index]
    
        def into_dataset(self) -> tf.data.Dataset:
            """Return dataset generated out of the current sequence instance.
    
            Implementative details
            ---------------------------------
            This method handles the conversion of this Keras Sequence into
            a TensorFlow dataset, also handling the proper dispatching according
            to what version of TensorFlow is installed in this system.
    
            Returns
            ----------------------------------
            Dataset to be used for the training of a model
            """
    
            #################################################################
            # Handling kernel creation when TensorFlow is a modern version. #
            #################################################################
    
            if tensorflow_version_is_higher_or_equal_than("2.5.0"):
                return tf.data.Dataset.from_generator(
                    self,
                    output_signature=(
                        (
                            tf.TensorSpec(
                                shape=(self._batch_size, 10),
                                dtype=tf.uint32
                            )
                        ),
                        tf.TensorSpec(
                            shape=(self._batch_size,),
                            dtype=tf.bool
                        )
                    )
                )
    
            return tf.data.Dataset.from_generator(
                self,
                output_types=(
                    (tf.uint32, ),
                    tf.bool
                ),
                output_shapes=(
                    (tf.TensorShape([self._batch_size, 10]),),
                    tf.TensorShape([self._batch_size, ]),
                )
            )
    
        def __getitem__(self, idx: int):
            """Return batch corresponding to given index.
    
            Parameters
            ---------------
            idx: int,
                Index corresponding to batch to be returned.
    
            Returns
            ---------------
            Return Tuple containing X and Y numpy arrays corresponding to given batch index.
            """
            X = np.random.randint(shape=(self._batch_size, 10), dtype=np.uint32)
            y = np.random.randint(high=2, shape=(self._batch_size, ), dtype=np.bool)
    
            # Please do observe that the return type
            # has multiple layer of tuple wrapping, and they are ALL needed!
            # It is weird, but it is the only way this thing worked.
            return (((X, ), y,),)
    
    
    

    And then, when you run the fit, you can use:

    model.fit(my_training_sequence.into_dataset())