Is it possible to interleave from a generator execution to normal method execution using ThreadPoolExecutor
from concurrent.futures
.
import concurrent.futures
from app import executor
def generator():
for val in external_api():
yield val
def func1():
return 1
def func2():
return 2
def stream():
generator_future = executor.submit(generator)
future1 = executor.submit(func1)
future2 = executor.submit(func2)
for future in concurrent.futures.as_completed([generator_future, future1, future2]):
if future == generator_future:
generator_result = future.result()
for value in generator_result:
yield value
else:
yield future.result()
If generator execution is started, normal method results cannot be yielded in stream()
, even when func1
or func2
is completed. Normal method results are on hold until generator yielding is done. Is it possible to yield func1
and func2
results during generator yielding.
Note: Using above snippet within flask application with Flask-Executor wrapper for ThreadPoolExecutor
to push application context to threads.
I am having some difficulty in understanding what your problem is or possibly you do not understand what is taking place. We shall find out together:
When you call a generator function, and I don't care how complicated it is inside or how long it takes to yield all its values, it returns almost instantaneously with an iterator that when iterated executes the code within the generator function to get all the elements generated with the yield
statement. The following demonstrates that what the multithreading pool is returning is an Iterator
(also an Iterable
), but you should know that since you are iterating this result:
import concurrent.futures
from collections.abc import Iterator
def generator():
import time
# Simulate an api call:
time.sleep(.5)
for val in range(3, 9):
yield val
def func1():
return 1
def func2():
return 2
def stream():
executor = concurrent.futures.ThreadPoolExecutor(3)
generator_future = executor.submit(generator)
future1 = executor.submit(func1)
future2 = executor.submit(func2)
for future in concurrent.futures.as_completed([generator_future, future1, future2]):
if future == generator_future:
generator_result = future.result()
# Demonstrate that what is returned from calling
# generator() is an iterator:
print(isinstance(generator_result, Iterator))
for value in generator_result:
yield value
else:
yield future.result()
if __name__ == '__main__':
print(list(stream()))
Prints:
True
[3, 4, 5, 6, 7, 8, 1, 2]
So of the three submitted tasks, which are all relatively trivial, the first one submitted completes first and this one is the call to the generator function. But you then immediately start iterating this iterator and this results in calling your (probably non-trivial) external api that returns multiple values that will be "yielded". You will not be checking the completion for any of the other two submitted tasks to the pool until you have retrieved all the yielded values and this will hold up getting any other results.
Why are you using a generator function? This means that the actual call to the external api will be performed by your main thread and not in one of the pool threads. To overlap the api call with other task submissions, try:
import concurrent.futures
from collections.abc import Iterable
def call_external_api():
import time
# Simulate an api call:
time.sleep(.5)
# Results from the call to the api:
return [3, 4, 5, 6, 7, 8]
def func1():
return 1
def func2():
return 2
def stream():
executor = concurrent.futures.ThreadPoolExecutor(3)
futures = [
executor.submit(call_external_api),
executor.submit(func1),
executor.submit(func2)
]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if isinstance(result, Iterable):
for elem in result:
yield elem
else:
yield result
if __name__ == '__main__':
print(list(stream()))
Prints:
[1, 2, 3, 4, 5, 6, 7, 8]
A Second Way
If you are forced to use function generator
as is (i.e. a generator function), then make sure that the iteration of it is done by the thread pool as follows:
import concurrent.futures
from collections.abc import Iterable
def generator():
import time
# Simulate an api call:
time.sleep(.5)
for val in range(3, 9):
yield val
def iterate_generator(generator):
return list(generator)
def func1():
return 1
def func2():
return 2
def stream():
executor = concurrent.futures.ThreadPoolExecutor(3)
futures = [
executor.submit(iterate_generator, generator()),
executor.submit(func1),
executor.submit(func2)
]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if isinstance(result, Iterable):
for elem in result:
yield elem
else:
yield result
if __name__ == '__main__':
print(list(stream()))
Summary
It makes no sense to pass a generator function as the argument to concurrent.futures.submit
since the real work is not done by calling the generator but rather in iterating the results of that call.
Update: Solution Using Threads
from threading import Thread
from queue import Queue
from random import random
from time import sleep
class Sentinel:
pass
# Represents "this thread is done generating results":
sentinel = Sentinel()
def generator():
# Simulate an api call:
sleep(random() / 8.0)
for val in range(3, 9):
# Some randomness so that tasks do not always
# complete at the same time:
sleep(random() / 5.0)
yield val
def iterate_generator(output_queue):
for result in generator():
output_queue.put(result)
# Show we are not returning any more results:
output_queue.put(sentinel)
def func1(output_queue):
# Some randomness so that tasks do not always
# complete at the same time:
sleep(random() / 4.0)
output_queue.put(1)
# Show we are not returning any more results:
output_queue.put(sentinel)
def func2(output_queue):
# Some randomness so that tasks do not always
# complete at the same time:
sleep(random() / 5.0)
output_queue.put(2)
# Show we are not returning any more results:
output_queue.put(sentinel)
def stream():
output_queue = Queue()
threads = [
Thread(target=iterate_generator, args=(output_queue,)),
Thread(target=func1, args=(output_queue,)),
Thread(target=func2, args=(output_queue,))
]
for thread in threads:
thread.start()
n_threads = len(threads)
# Retrieve results until we have seen n_threads
# sentinel values. Then we know all tasks have finished:
while n_threads:
result = output_queue.get()
if result is sentinel:
n_threads -= 1
else:
yield result
for thread in threads:
thread.join()
if __name__ == '__main__':
# Perform multiple runs:
for _ in range(10):
print(list(stream()))
Output from successive runs:
[2, 3, 4, 1, 5, 6, 7, 8]
[1, 2, 3, 4, 5, 6, 7, 8]
[2, 1, 3, 4, 5, 6, 7, 8]
[2, 1, 3, 4, 5, 6, 7, 8]
[2, 1, 3, 4, 5, 6, 7, 8]
[1, 2, 3, 4, 5, 6, 7, 8]
[2, 3, 1, 4, 5, 6, 7, 8]
[1, 3, 2, 4, 5, 6, 7, 8]
[2, 3, 1, 4, 5, 6, 7, 8]
[2, 1, 3, 4, 5, 6, 7, 8]