I am trying to mess around with matrices in python, and wanted to use multiprocessing to processes each row separately for a math operation, I have posted a minimal reproducible sample below, but keep in mind that for my actual code I do in-fact need the entire matrix passed to the helper function. This sample takes literally forever to process a 10,000 by 10,000 matrix. Almost 2 hours with 9 processes. Looking in task manage it seems only 4-5 of the threads will run at any given time on my cpu, and the application never uses more than 25%. I've done my absolute best to avoid branches in my real code, though the sample provided is branchless. It still takes roughly 25 seconds to process a 1000 by 1000 matrix on my machine, which is ludacris to me as a mainly c++ developer. I wrote serial code in C that executes the entire 10,000 by 10,000 in constant time in less than a second. I think the main bottleneck is the multiprocessing code, but I am required to do this with multiprocessing. Any ideas for how I could go about improving this? Each row can be processed entirely separately but they need to be joined together back into a matrix for my actual code.
import random
from multiprocessing import Pool
import time
def addMatrixRow(matrixData):
matrix = matrixData[0]
rowNum = matrixData[1]
del (matrixData)
rowSum = 0
for colNum in range(len(matrix[rowNum])):
rowSum += matrix[rowNum][colNum]
return rowSum
def genMatrix(row, col):
matrix = list()
for i in range(row):
matrix.append(list())
for j in range(col):
matrix[i].append(random.randint(0, 1))
return matrix
def main():
matrix = genMatrix(1000, 1000)
print("generated matrix")
MAX_PROCESSES = 4
finalSum = 0
processPool = Pool(processes=MAX_PROCESSES)
poolData = list()
start = time.time()
for i in range(100):
for rowNum in range(len(matrix)):
matrixData = [matrix, rowNum]
poolData.append(matrixData)
finalData = processPool.map(addMatrixRow, poolData)
poolData = list()
finalSum += sum(finalData)
end = time.time()
print(end-start)
print(f'final sum {finalSum}')
if __name__ == '__main__':
main()
Your matrix
has 1000 rows of 1000 elements each and you are summing each row 100 times. By my calculation, that is 100,000 tasks you are submitting to the pool passing a one-million element matrix each time. Ouch!
Now I know you say that the worker function addMatrixRow
must have access to the complete matrix. Fine. But instead of passing it a 100,000 times, you can reduce that to 4 times by initializing each process in the pool with a global variable set to the matrix using the initializer and initargs arguments when you construct the pool. You are able to get away with this because the matrix is read-only.
And instead of creating poolArgs
as a large list you can instead create a generator function that when iterated returns the next argument to be submitted to the pool. But to take advantage of this you cannot use the map
method, which will convert the generator to a list and not save you any memory. Instead use imap_unordered
(rather than imap
since you do not care now in what order your worker function is returning its results because of the commutative law of addition). But with such a large input, you should be using the chunksize argument with imap_unordered
. So that the number of reads and writes to the pool's task queue is greatly reduced(albeit the size of the data being written is larger for each queue operation).
If all of this is somewhat vague to you, I suggest reading the docs thoroughly for class multiprocessing.pool.Pool
and its imap
and imap_unordered
methods.
I have made a few other optimizations replacing for
loops with list comprehensions and using the built-in sum
function.
import random
from multiprocessing import Pool
import time
def init_pool_processes(m):
global matrix
matrix = m
def addMatrixRow(rowNum):
return sum(matrix[rowNum])
def genMatrix(row, col):
return [[random.randint(0, 1) for _ in range(col)] for _ in range(row)]
def compute_chunksize(pool_size, iterable_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
def main():
matrix = genMatrix(1000, 1000)
print("generated matrix")
MAX_PROCESSES = 4
processPool = Pool(processes=MAX_PROCESSES, initializer=init_pool_processes, initargs=(matrix,))
start = time.time()
# Use a generator function:
poolData = (rowNum for _ in range(100) for rowNum in range(len(matrix)))
# Compute efficient chunksize
chunksize = compute_chunksize(MAX_PROCESSES, len(matrix) * 100)
finalSum = sum(processPool.imap_unordered(addMatrixRow, poolData, chunksize=chunksize))
end = time.time()
print(end-start)
print(f'final sum {finalSum}')
processPool.close()
processPool.join()
if __name__ == '__main__':
main()
Prints:
generated matrix
0.35799622535705566
final sum 49945400
Note the running time of .36 seconds.
Assuming you have more CPU cores (than 4), use them all for an even greater reduction in time.