I am trying to parallelize a loop which uses an infinite generator as input to collect some data and stops when a certain amount of data has been received.
My implementation is something like this.
class A:
def __iter__(self):
i = 0
while True:
yield i
i += 1
def procpar(x):
r = random.random()
print('Computing x =', x)
if r > 0.5
return [2 * x]
else:
return [2 * x, x ** 2]
with ProcessPoolExecutor(4) as pool:
out = []
x = A()
for res in pool.map(procpar, x):
out.extend(res)
if len(out) > 100:
break
Now, when I run it, I do get this output, after which it just hangs and nothing happend.
Computing x = 1
Computing x = 6
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
Looking into whats going on, is that the map
method is trying to unroll and generate data from the iterator x = A()
, so it is stuck in an infinite loop.
Any suggestions how to avoid being stuck in infinite loop. Ofcorse, I could call the iterator x
in chunks before feeding them to the process pool, but looking if someone may have better or more straightforward solution.
Try using multiprocessing.pool.imap
instead:
from multiprocessing import Pool
import random
class A:
def __iter__(self):
i = 0
while True:
yield i
i += 1
def procpar(x):
r = random.random()
print('Computing x =', x)
if r > 0.5:
return [2 * x]
else:
return [2 * x, x ** 2]
# Required for Windows:
if __name__ == '__main__':
with Pool(4) as pool:
out = []
x = A()
for res in pool.imap(procpar, x):
out.extend(res)
if len(out) > 100:
break
print(out)
Prints:
Computing x = 0
Computing x = 1
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
Computing x = 6
Computing x = 7
Computing x = 8
Computing x = 9
Computing x = 10
Computing x = 11
Computing x = 12
Computing x = 13
Computing x = 14
Computing x = 15
Computing x = 16
Computing x = 17
Computing x = 18
Computing x = 19
Computing x = 20
Computing x = 21
Computing x = 22
Computing x = 23
Computing x = 24
Computing x = 25
Computing x = 26
Computing x = 27
Computing x = 28
Computing x = 29
Computing x = 30
Computing x = 31
Computing x = 32
Computing x = 33
Computing x = 34
Computing x = 35
Computing x = 36
Computing x = 37
Computing x = 38
Computing x = 39
Computing x = 40
Computing x = 41
Computing x = 42
Computing x = 43
Computing x = 44
Computing x = 45
Computing x = 46
Computing x = 47
Computing x = 48
Computing x = 49
Computing x = 50
Computing x = 51
Computing x = 52
Computing x = 53
Computing x = 54
Computing x = 55
Computing x = 56
Computing x = 57
Computing x = 58
Computing x = 59
Computing x = 60
Computing x = 61
Computing x = 62
Computing x = 63
Computing x = 64
[0, 2, 1, 4, 4, 6, 8, 16, 10, 12, 14, 49, 16, 64, 18, 20, 22, 24, 144, 26, 28, 196, 30, 225, 32, 256, 34, 289, 36, 38, 361, 40, 400, 42, 441, 44, 484, 46, 529, 48, 576, 50, 625, 52, 54, 56, 58, 60, 900, 62, 961, 64, 66, 1089, 68, 1156, 70, 1225, 72, 1296, 74, 1369, 76, 78, 1521, 80, 1600, 82, 1681, 84, 1764, 86, 88, 90, 2025, 92, 2116, 94, 96, 2304, 98, 100, 102, 2601, 104, 2704, 106, 2809, 108, 110, 3025, 112, 3136, 114, 116, 3364, 118, 3481, 120, 3600, 122]