python-3.xmultithreadingmultiprocessingtensorflow2.0keras-2

Tensorflow: How to use a generator for fit() which runs in parallel with multiple processes


I am trying to train a model on a data set which does not fit in my RAM. Therefore I am using a data generator which inherits from tensorflow.keras.utils.Sequence as shown below. This is working. However because I am doing processing on the images my training is CPU bound. When looking in GPU-Z my GPU is only at 10-20% but one of my CPU Cores is at its max.
To solve this I am trying to run the generator in parallel on all my 16 cores. However when I set use_multiprocessing=True in the fit() function the program freezes. And using workers=8 does not speed up the process just produces batches in uneven intervals.
ex.: batch 1-8 is processed immediately than there is some delay and than batch 9-16 is processed.

The code below shows what I am trying to do.

#read the dataset
x, o_y = reader.read_dataset_whole(ETLCharacterGroups.kanji)

#split data into 90/10 percent parts
percentage = round(len(x) / 100 * 80)

x_train = x[:percentage]
x_test = x[percentage:]

y_train = o_y[:percentage]
y_test = o_y[percentage:]
def distort_sample(img : Image) -> (Image, [int], [int]):
    """
    Distort the given image randomly.

    Randomly applies the transformations:
        - rotation
        - shear
        - scale
        - translate
        - sharpen
        - blur

    Returns the distorted image.
    """

    offset, scale = (0, 0), (64, 64)

    t = random.choice(["sine"]) # "rotate", "shear", "scale", 
    f = random.choice(["blur", "sharpen", "smooth"])

    # randomly apply transformations...
    # rotate image
    if("rotate" in t):
        img = img.rotate(random.uniform(-30, 30))
    
    # shear image
    if("shear" in t):
        y_shear = random.uniform(-0.2, 0.2)
        x_shear = random.uniform(-0.2, 0.2)
        img = img.transform(img.size, PImage.AFFINE, (1, x_shear, 0, y_shear, 1, 0))
    
    # scale and translate image
    if("scale" in t):
        #scale the image
        size_x = random.randrange(20, 63)
        size_y = random.randrange(20, 63)
        scale = (size_x, size_y)
        offset = (math.ceil((64 - size_x) / 2), math.ceil((64 - size_y) / 2))
        img = img.resize(scale)

        # put it again on a black background (translated)
        background = PImage.new('L', (64, 64))
        trans_x = random.randrange(0, math.floor((64 - size_x)))
        trans_y = random.randrange(0, math.floor((64 - size_y)))
        offset = (trans_x, trans_y)
        background.paste(img, offset)
        img = background
    
    if("sine" in t):
        t_img = np.array(img)

        A = t_img.shape[0] / 3.0
        w = 2.0 / t_img.shape[1]

        shift = lambda x: random.uniform(0.15, 0.2) * A * np.sin(-2*np.pi*x * w)

        for i in range(t_img.shape[0]):
            t_img[:,i] = np.roll(t_img[:,i], int(shift(i)))

        img = PImage.fromarray(t_img)


    # blur
    if("blur" in f):
        img = img.filter(ImageFilter.GaussianBlur(radius=random.uniform(0.5, 1.2)))

    # sharpen
    if("sharpen" in f):
        img = img.filter(ImageFilter.SHARPEN)
        
    # smooth
    if("smooth" in f):
        img = img.filter(ImageFilter.SMOOTH)

    return img, offset, scale
class DataGenerator(tf.keras.utils.Sequence):

    def __init__(self, x_col, y_col, batch_size, mode="training",  shuffle=True):
        self.batch_size = batch_size
        self.undistorted_images = batch_size // 2
        self.shuffle = shuffle

        self.indices = len(x_col)
        self.x_col = x_col
        self.y_col = y_col

    def __len__(self):
        return self.indices // self.batch_size

    def on_epoch_end(self):
        if(False):
            rng_state = np.random.get_state()
            np.random.shuffle(x)
            np.random.set_state(rng_state)
            np.random.shuffle(o_y)
            
    def __getitem__(self, index):
        X, Y = [], []
        
        for i in range(index * self.undistorted_images, (index+1) * self.undistorted_images):
            base_img = self.x_col[i]
            img = PImage.fromarray(np.uint8(base_img.reshape(64, 64) * 255))
            # distort_sample() creates random variations of an image
            img, *unused = distort_sample(img)

            # add transformed image
            X.append(np.array(img).reshape(64, 64, 1))
            Y.append(self.y_col[i])
            
            # add base image
            X.append(base_img)
            Y.append(self.y_col[i])

        return np.array(X), np.array(Y)
#instantiate generators
training_generator = DataGenerator(x_col = x_train, y_col = y_train, batch_size = 256)
validation_generator = DataGenerator(x_col = x_test, y_col = y_test, batch_size = 256)
#train the model
hist = model.fit(
    x=training_generator,
    epochs=100,
    validation_data=training_generator,
    max_queue_size=50,
    workers=8,
    #use_multiprocessing=True   <- this freezes the program
)

Solution

  • In the end I needed to make the Data generator use multi processing. To do this, the arrays needed to be stored in shared memory and than used in the sub processes.

    import multiprocessing as mp
    import numpy as np
    from PIL import Image as PImage
    from PIL import ImageFilter
    import random
    import math
    import tensorflow as tf
    
    
    
    shared_dict = {}
    
    
    def distort_sample(img : PImage) -> (PImage, [int], [int]):
        """
        Distort the given image randomly.
        Randomly applies the transformations:
            rotation, shear, scale, translate, 
        Randomly applies the filter:
            sharpen, blur, smooth
        Returns the distorted image.
        """
    
        offset, scale = (0, 0), (64, 64)
    
        t = random.choice(["sine", "rotate", "shear", "scale"])
        f = random.choice(["blur", "sharpen", "smooth"])
    
        # randomly apply transformations...
        # rotate image
        if("rotate" in t):
            img = img.rotate(random.uniform(-15, 15))
        
        # shear image
        if("shear" in t):
            y_shear = random.uniform(-0.2, 0.2)
            x_shear = random.uniform(-0.2, 0.2)
            img = img.transform(img.size, PImage.AFFINE, (1, x_shear, 0, y_shear, 1, 0))
        
        # scale and translate image
        if("scale" in t):
            #scale the image
            size_x = random.randrange(25, 63)
            size_y = random.randrange(25, 63)
            scale = (size_x, size_y)
            offset = (math.ceil((64 - size_x) / 2), math.ceil((64 - size_y) / 2))
            img = img.resize(scale)
    
            # put it again on a black background (translated)
            background = PImage.new('L', (64, 64))
            trans_x = random.randrange(0, math.floor((64 - size_x)))
            trans_y = random.randrange(0, math.floor((64 - size_y)))
            offset = (trans_x, trans_y)
            background.paste(img, offset)
            img = background
        
        if("sine" in t):
            t_img = np.array(img)
    
            A = t_img.shape[0] / 3.0
            w = 2.0 / t_img.shape[1]
    
            shift_factor = random.choice([-1, 1]) * random.uniform(0.15, 0.2)
            shift = lambda x: shift_factor * A * np.sin(-2*np.pi*x * w)
    
            for i in range(t_img.shape[0]):
                t_img[:,i] = np.roll(t_img[:,i], int(shift(i)))
    
            img = PImage.fromarray(t_img)
    
    
        # blur
        if("blur" in f):
            img = img.filter(ImageFilter.GaussianBlur(radius=random.uniform(0.5, 1.2)))
    
        # sharpen
        if("sharpen" in f):
            img = img.filter(ImageFilter.SHARPEN)
            
        # smooth
        if("smooth" in f):
            img = img.filter(ImageFilter.SMOOTH)
    
        return img, offset, scale
    
    def generator_func(start_index, end_index, x_shape, y_shape):
        X, Y = [], []
        
        x_loc = np.frombuffer(shared_dict["x"], dtype="float16").reshape(x_shape)
        y_loc = np.frombuffer(shared_dict["y"], dtype="b").reshape(y_shape)
        
        for i in range(start_index, end_index):
            base_img = x_loc[i]
            img = PImage.fromarray(np.uint8(base_img.reshape(64, 64) * 255))
            img, *unused = distort_sample(img)
    
            # add transformed image
            X.append(np.array(img).reshape(64, 64, 1))
            Y.append(y_loc[i])
            X.append(np.array(img).reshape(64, 64, 1))
            Y.append(y_loc[i])
    
            # add base image
            #X.append(base_img)
            #Y.append(y_loc[i])
            
        return X, Y
    
    def generator_initializer(_x_shared, _y_shared):
        shared_dict["x"] = _x_shared
        shared_dict["y"] = _y_shared
    
    def generator_func(start_index, end_index, x_shape, y_shape):
        X, Y = [], []
        
        x_loc = np.frombuffer(shared_dict["x"], dtype="float16").reshape(x_shape)
        y_loc = np.frombuffer(shared_dict["y"], dtype="b").reshape(y_shape)
        
        for i in range(start_index, end_index):
            base_img = x_loc[i]
            img = PImage.fromarray(np.uint8(base_img.reshape(64, 64) * 255))
            img, *unused = distort_sample(img)
    
            # add transformed image
            X.append(np.array(img).reshape(64, 64, 1))
            Y.append(y_loc[i])
            X.append(np.array(img).reshape(64, 64, 1))
            Y.append(y_loc[i])
    
            # add base image
            #X.append(base_img)
            #Y.append(y_loc[i])
            
        return X, Y
    
    class DataGenerator(tf.keras.utils.Sequence):
    
        def __init__(self, num_samples, batch_size,
                            percentage, mode,
                            x_shared, y_shared,
                            x_np_shape, y_np_shape,
                            processes, shuffle=True):
            self.num_samples = num_samples
            # 50% original images + 50% augmented images 
            self.batch_size = batch_size // 2
            self.percentage = percentage
    
            # an offset to devide the data set into test and train
            self.start_index = 0
            if(mode == "testing"):
                self.start_index = num_samples - (num_samples // 100 * percentage)
            # is this a train or a test generator
            self.mode = mode
            # how many processes should be used for this generator
            self.processes = processes
            # should the arrays be shuffled after each epoch
            self.shuffle = shuffle
    
            self.x_np_shape = x_np_shape
            self.y_np_shape = y_np_shape
            
            # a pool of processes for generating augmented data
            self.pool = mp.Pool(processes=self.processes,
                initializer=generator_initializer,
                initargs=(x_shared, y_shared))
            
        def __len__(self):
            return (self.num_samples // 100 * self.percentage) // self.batch_size
    
        def on_epoch_end(self):
            if(False):
                rng_state = np.random.get_state()
                np.random.shuffle(x_np)
                np.random.set_state(rng_state)
                np.random.shuffle(y_np)
                
        def __getitem__(self, index):
    
            arguments = []
            slice_size = self.batch_size // self.processes
            current_batch = index * self.batch_size
            for i in range(self.processes):
                slice_start = self.start_index + (current_batch + i * slice_size)
                slice_end = self.start_index + (current_batch + (i+1) * slice_size)
                arguments.append([slice_start, slice_end, self.x_np_shape, self.y_np_shape])
            
            return_values = self.pool.starmap(generator_func, arguments)
    
            X, Y = [], []
            for imgs, labels in return_values:
                X.append(imgs)
                Y.append(labels)
    
            return np.concatenate(X).astype(np.float16), np.concatenate(Y).astype(np.float16)