python-3.xtensorflowkeraspathos

Error from running tensorflow models in parallel, when sequentially it works fine


Trying to use multiple TensorFlow models in parallel using pathos.multiprocessing.Pool

Error is:

multiprocess.pool.RemoteTraceback:

Traceback (most recent call last):
  File "c:\users\burge\appdata\local\programs\python\python37\lib\site-packages\multiprocess\pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "c:\users\burge\appdata\local\programs\python\python37\lib\site-packages\multiprocess\pool.py", line 44, in mapstar
    return list(map(*args))
  File "c:\users\burge\appdata\local\programs\python\python37\lib\site-packages\pathos\helpers\mp_helper.py", line 15, in <lambda>
    func = lambda args: f(*args)
  File "c:\Users\Burge\Desktop\SwarmMemory\sim.py", line 38, in run
    i.step()
  File "c:\Users\Burge\Desktop\SwarmMemory\agent.py", line 240, in step
    output = self.ai(np.array(self.internal_log).reshape(-1, 1, 9))
  File "c:\users\burge\appdata\local\programs\python\python37\lib\site-packages\tensorflow\python\keras\engine\base_layer.py", line 1012, in __call__
    outputs = call_fn(inputs, *args, **kwargs)
  File "c:\users\burge\appdata\local\programs\python\python37\lib\site-packages\tensorflow\python\keras\engine\sequential.py", line 375, in call
    return super(Sequential, self).call(inputs, training=training, mask=mask)
  File "c:\users\burge\appdata\local\programs\python\python37\lib\site-packages\tensorflow\python\keras\engine\functional.py", line 425, in call
    inputs, training=training, mask=mask)
  File "c:\users\burge\appdata\local\programs\python\python37\lib\site-packages\tensorflow\python\keras\engine\functional.py", line 569, in _run_internal_graph
    assert x_id in tensor_dict, 'Could not compute output ' + str(x)
AssertionError: Could not compute output KerasTensor(type_spec=TensorSpec(shape=(None, 1, 4), dtype=tf.float32, name=None), name='dense_1/BiasAdd:0', description="created by layer 'dense_1'")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "c:\Users\Burge\Desktop\SwarmMemory\sim.py", line 78, in <module>
    p.map(Sim.run, sims)
  File "c:\users\burge\appdata\local\programs\python\python37\lib\site-packages\pathos\multiprocessing.py", line 137, in map
    return _pool.map(star(f), zip(*args)) # chunksize
  File "c:\users\burge\appdata\local\programs\python\python37\lib\site-packages\multiprocess\pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "c:\users\burge\appdata\local\programs\python\python37\lib\site-packages\multiprocess\pool.py", line 657, in get
    raise self._value
AssertionError: Could not compute output KerasTensor(type_spec=TensorSpec(shape=(None, 1, 4), dtype=tf.float32, name=None), name='dense_1/BiasAdd:0', description="created by layer 'dense_1'")

The creation of the pool is as follows:

if __name__ == '__main__':
    freeze_support()

    model = Sequential()
    model.add(Input(shape=(1,9)))
    model.add(LSTM(10, return_sequences=True))
    model.add(Dropout(0.1))
    model.add(LSTM(5))
    model.add(Dropout(0.1))
    model.add(Dense(4))
    model.add(Dense(4))

    models = []
    sims = []

    for i in range(6):
        models.append(tensorflow.keras.models.clone_model(model))
        sims.append(Sim(models[-1]))
    
    p = Pool()
    p.map(Sim.run, sims)

Basically, I am running a simulation using the model provided to the class sim. This means after the sim has run I can get use a fitness function on results, and apply a genetic algorithm to the results.

GitHub link for more information, under branch python-ver: https://github.com/HarryBurge/SwarmMemory

EDIT: In case anyone needs to know how to do this in the future. I used keras-pickle-wrapper to be able to pickle the keras model and just pass it to the run method.

models = []
sims = []

for i in range(6):
      models.append(KerasPickleWrapper(tensorflow.keras.models.clone_model(model)))
      sims.append(Sim())
    
p = Pool()
p.map(Sim.run, sims, models)

Solution

  • I'm the author of pathos. Whenever you see self._value in the error, what's generally happening is that something you tried to send to another processor failed to serialize. The error and traceback is a bit obtuse, admittedly. However, what you can do is check the serialization with dill, and determine if you need to use one of the serialization variants (like dill.settings['trace'] = True), or whether you need to restructure your code slightly to better accommodate serialization. If the class you are working with is something you can edit, then an easy thing to do is to add a __reduce__ method, or similar, to aid serialization.