I have the following piece of code to illustrate my problem:
Each thread calculates a value locs
and then updates the result
array, assume that that update (result[locs] += mask[locs]
) is a very slow operation, how can I parallelize it so it can be threaded too?
import numpy as np
import time
import concurrent.futures
MAX = 100
SIZE = 500
mask = np.random.randint(0, MAX, (SIZE, SIZE))
def process_image(i):
start = time.time()
locs = np.where(mask > i)
print(f" process_image({i}) took {round(time.time() - start, 2)} secs.")
return locs
if __name__ == '__main__':
result = np.zeros((SIZE, SIZE))
with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
results = [executor.submit(process_image, i) for i in range(MAX) ]
for f in concurrent.futures.as_completed(results):
locs = f.result()
# How do I parallelize this operation? Where the result of each thread updates a shared result array
result[locs] += mask[locs]
print(result)
You're using multithreading and therefore the results ndarray is naturally available to all threads.
However, the result[locs] += mask[locs] may not be thread-safe. Therefore you would need to defend the result object using a Lock.
I assume that your implementation of process_image() does some I/O in reality then you could make the following changes:
import numpy as np
import time
import concurrent.futures
from threading import Lock
MAX = 100
SIZE = 500
mask = np.random.randint(0, MAX, (SIZE, SIZE))
LOCK = Lock()
def process_image(i):
start = time.time()
locs = np.where(mask > i)
print(f" process_image({i}) took {round(time.time() - start, 2)} secs.")
with LOCK:
result[locs] += mask[locs]
if __name__ == '__main__':
result = np.zeros((SIZE, SIZE))
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(process_image, range(MAX))
print(result)