I am using Python version 3.5.1. I want to parallelise a loop that is used to plot a set of arrays using imshow. The minimal code without any parallelisation is as follows
import matplotlib.pyplot as plt
import numpy as np
# Generate data
arrays = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]
# Loop and plot sequentially
for i in range(len(arrays)):
# Plot side by side
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
ax_1.imshow(arrays[i], interpolation='gaussian', cmap='RdBu', vmin=0.5*np.min(arrays[i]), vmax=0.5*np.max(arrays[i]))
ax_2.imshow(arrays_2[i], interpolation='gaussian', cmap='YlGn', vmin=0.5*np.min(arrays_2[i]), vmax=0.5*np.max(arrays_2[i]))
plt.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close()
This code is currently written in a Jupyter notebook and I would like to do all the processing through the Jupyter notebook only. While this works well, in reality I have 2500+ arrays and at approximately 1 plot per second this takes far too long to complete. What I would like to do is to split the computation across N processors so that each processor makes plots for len(arrays)/N number of arrays. As the plots are of the individual arrays themselves, there is no need for the cores to talk to each other during any of the computation (no sharing).
I have seen that the multiprocessing package is good for similar problems. However, it does not work for my problem as you can't pass 2D arrays into the function. If I modify my code above as so
# Generate data
arrays = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]
x = list(zip(arrays, arrays_2))
def plot_file(information):
arrays, arrays_2 = list(information[0]), list(information[1])
print(np.shape(arrays[0][0]), np.shape(arrays_2[0][0]))
# Loop and plot sequentially
for i in range(len(arrays)):
# Plot side by side
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
ax_1.imshow(arrays[i], interpolation='gaussian', cmap='RdBu', vmin=0.5*np.min(arrays[i]), vmax=0.5*np.max(arrays[i]))
ax_2.imshow(arrays_2[i], interpolation='gaussian', cmap='YlGn', vmin=0.5*np.min(arrays_2[i]), vmax=0.5*np.max(arrays_2[i]))
plt.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close()
from multiprocessing import Pool
pool = Pool(4)
pool.map(plot_file, x)
then I get the error 'TypeError: Invalid dimensions for image data' and the print out for the dimensions of the array is now just (2, ) rather than (3, 2). Apparently, this is because multiprocessing doesn't/can't handle 2D arrays as inputs.
So I was wondering, how I could parallelise this inside the Jupyter notebook? Could someone please show me how to do this?
EDIT (03/11/2022):
The real problem with my original code was that pool.map(func, args) passes in one element of args at a time to func on a single processor, not the entire list of arrays as I thought, meaning that when I tried to loop over the arrays list I was looping over the rows of the arrays and then trying to do an imshow plot of the rows, yielding the error.
Anyway, although this question already has a very good answer accepted, I thought I would provide the code that works using only the multiprocessing package in case anyone else has the same issue or if anyone wanted to see how it should be done.
n = 10
arrays_1 = (np.random.rand(256, 256) for x in range(n))
arrays_2 = (np.random.rand(256, 256) for x in range(n))
x = zip(range(n), arrays_1, arrays_2) # need to pass the args into pool.map(func, args) as a tuple
def plot_file(information):
# get cpu name that is working on current data
process_name = multiprocessing.current_process().name
print('Process name {} is plotting'.format(process_name))
# unpack elements of tuple
index, arrays_1, arrays_2 = information
# plot
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
ax_1.imshow(arrays_1, interpolation='gaussian', cmap='RdBu')
ax_2.imshow(arrays_2, interpolation='gaussian', cmap='YlGn')
# save
plt.savefig('./{}'.format(index), bbox_inches='tight')
plt.close()
import multiprocessing
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count()//4) # use one quarter of available processors
pool.map(plot_file, x) # sequentially map each element of x to the function and process
One easy way to do this would be to use dask.distributed
using the multiprocessing engine. I only suggest an external module because dask handles serialization of objects for you, making this a very simple operation:
import matplotlib
# include this line to allow your processes to plot without a screen
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import dask.distributed
import numpy as np
def plot_file(i, array_1, array_2):
matplotlib.use('Agg')
# will be called once for each array "job"
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
for ax, arr, cmap in [(ax_1, array_1, 'RdBu'), (ax_2, array_2, 'YlGn')]:
ax.imshow(
arr,
interpolation='gaussian',
cmap='RdBu',
vmin=0.5*np.min(arr),
vmax=0.5*np.max(arr),
)
figure.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close(figure)
arrays = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]
client = dask.distributed.Client() # uses multiprocessing by default
futures = client.map(plot_file, range(len(arrays)), arrays, arrays_2)
dask.distributed.progress(futures)
Even more efficient, however, would be to generate or prepare your arrays within the mapped task if possible. This would allow you to carry out your array operations, I/O, etc in parallel too:
def prep_arrays_and_plot(i):
array_1 = np.random.rand(3,2)
array_2 = np.random.rand(3,2)
plot_file(i, array_1, array_2)
futures = client.map(prep_arrays_and_plot, range(10))
dask.distributed.progress(futures)
At this point, you don't need to pickle anything, so writing with multiprocessing isn't too big a deal. The following script runs just fine:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import multiprocessing
def plot_file(i, array_1, array_2):
matplotlib.use('Agg')
# will be called once for each array "job"
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
for ax, arr, cmap in [(ax_1, array_1, 'RdBu'), (ax_2, array_2, 'YlGn')]:
ax.imshow(
arr,
interpolation='gaussian',
cmap='RdBu',
vmin=0.5*np.min(arr),
vmax=0.5*np.max(arr),
)
figure.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close(figure)
def prep_arrays_and_plot(i):
array_1 = np.random.rand(3,2)
array_2 = np.random.rand(3,2)
plot_file(i, array_1, array_2)
def main():
pool = multiprocessing.Pool(4)
pool.map(prep_arrays_and_plot, range(10))
if __name__ == "__main__":
main()
Note that if you're running this from a jupyter notebook, you cannot simply define the functions in cells and pass them to multiprocessing.Pool. Instead, you must define them in a different file and import them. This doesn't apply to dask (in fact, it's easier if you define the functions in the notebook with dask).