pythonmultithreadingflaskgeneratorconcurrent.futures

Interleave between normal python methods and generators during generator execution


Is it possible to interleave from a generator execution to normal method execution using ThreadPoolExecutor from concurrent.futures.

import concurrent.futures
from app import executor


def generator():
    for val in external_api():
        yield val


def func1():
    return 1

def func2():
    return 2


def stream():
    generator_future = executor.submit(generator)
    future1 = executor.submit(func1)
    future2 = executor.submit(func2)

    for future in concurrent.futures.as_completed([generator_future, future1, future2]):
        if future == generator_future:
            generator_result = future.result()
            for value in generator_result:
                yield value
        else:
            yield future.result()

If generator execution is started, normal method results cannot be yielded in stream(), even when func1 or func2 is completed. Normal method results are on hold until generator yielding is done. Is it possible to yield func1 and func2 results during generator yielding.

Note: Using above snippet within flask application with Flask-Executor wrapper for ThreadPoolExecutor to push application context to threads.


Solution

  • I am having some difficulty in understanding what your problem is or possibly you do not understand what is taking place. We shall find out together:

    When you call a generator function, and I don't care how complicated it is inside or how long it takes to yield all its values, it returns almost instantaneously with an iterator that when iterated executes the code within the generator function to get all the elements generated with the yield statement. The following demonstrates that what the multithreading pool is returning is an Iterator (also an Iterable), but you should know that since you are iterating this result:

    import concurrent.futures
    from collections.abc import Iterator
    
    def generator():
        import time
    
        # Simulate an api call:
        time.sleep(.5)
    
        for val in range(3, 9):
            yield val
    
    
    def func1():
        return 1
    
    def func2():
        return 2
    
    
    def stream():
        executor = concurrent.futures.ThreadPoolExecutor(3)
        generator_future = executor.submit(generator)
        future1 = executor.submit(func1)
        future2 = executor.submit(func2)
    
        for future in concurrent.futures.as_completed([generator_future, future1, future2]):
            if future == generator_future:
                generator_result = future.result()
                # Demonstrate that what is returned from calling
                # generator() is an iterator:
                print(isinstance(generator_result, Iterator))
                for value in generator_result:
                    yield value
            else:
                yield future.result()
    
    if __name__ == '__main__':
        print(list(stream()))
    

    Prints:

    True
    [3, 4, 5, 6, 7, 8, 1, 2]
    

    So of the three submitted tasks, which are all relatively trivial, the first one submitted completes first and this one is the call to the generator function. But you then immediately start iterating this iterator and this results in calling your (probably non-trivial) external api that returns multiple values that will be "yielded". You will not be checking the completion for any of the other two submitted tasks to the pool until you have retrieved all the yielded values and this will hold up getting any other results.

    Why are you using a generator function? This means that the actual call to the external api will be performed by your main thread and not in one of the pool threads. To overlap the api call with other task submissions, try:

    import concurrent.futures
    from collections.abc import Iterable
    
    def call_external_api():
        import time
    
        # Simulate an api call:
        time.sleep(.5)
    
        # Results from the call to the api:
        return [3, 4, 5, 6, 7, 8]
    
    
    def func1():
        return 1
    
    def func2():
        return 2
    
    
    def stream():
        executor = concurrent.futures.ThreadPoolExecutor(3)
        futures = [
            executor.submit(call_external_api),
            executor.submit(func1),
            executor.submit(func2)
        ]
    
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            if isinstance(result, Iterable):
                for elem in result:
                    yield elem
            else:
               yield result
    
    if __name__ == '__main__':
        print(list(stream()))
    

    Prints:

    [1, 2, 3, 4, 5, 6, 7, 8]
    

    A Second Way

    If you are forced to use function generator as is (i.e. a generator function), then make sure that the iteration of it is done by the thread pool as follows:

    import concurrent.futures
    from collections.abc import Iterable
    
    def generator():
        import time
    
        # Simulate an api call:
        time.sleep(.5)
    
        for val in range(3, 9):
            yield val
    
    def iterate_generator(generator):
        return list(generator)
    
    def func1():
        return 1
    
    def func2():
        return 2
    
    
    def stream():
        executor = concurrent.futures.ThreadPoolExecutor(3)
        futures = [
            executor.submit(iterate_generator, generator()),
            executor.submit(func1),
            executor.submit(func2)
        ]
    
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            if isinstance(result, Iterable):
                for elem in result:
                    yield elem
            else:
               yield result
    
    if __name__ == '__main__':
        print(list(stream()))
    

    Summary

    It makes no sense to pass a generator function as the argument to concurrent.futures.submit since the real work is not done by calling the generator but rather in iterating the results of that call.

    Update: Solution Using Threads

    from threading import Thread
    from queue import Queue
    from random import random
    from time import sleep
    
    class Sentinel:
        pass
    
    # Represents "this thread is done generating results":
    sentinel = Sentinel()
    
    def generator():
        # Simulate an api call:
        sleep(random() / 8.0)
    
        for val in range(3, 9):
            # Some randomness so that tasks do not always
            # complete at the same time:
            sleep(random() / 5.0)
            yield val
    
    def iterate_generator(output_queue):
        for result in generator():
            output_queue.put(result)
        # Show we are not returning any more results:
        output_queue.put(sentinel)
    
    def func1(output_queue):
        # Some randomness so that tasks do not always
        # complete at the same time:
        sleep(random() / 4.0)
        output_queue.put(1)
        # Show we are not returning any more results:
        output_queue.put(sentinel)
    
    def func2(output_queue):
        # Some randomness so that tasks do not always
        # complete at the same time:
        sleep(random() / 5.0)
        output_queue.put(2)
        # Show we are not returning any more results:
        output_queue.put(sentinel)
    
    def stream():
        output_queue = Queue()
    
        threads = [
            Thread(target=iterate_generator, args=(output_queue,)),
            Thread(target=func1, args=(output_queue,)),
            Thread(target=func2, args=(output_queue,))
        ]
        for thread in threads:
            thread.start()
    
        n_threads = len(threads)
        # Retrieve results until we have seen n_threads
        # sentinel values. Then we know all tasks have finished:
        while n_threads:
            result = output_queue.get()
            if result is sentinel:
                n_threads -= 1
            else:
                yield result
    
        for thread in threads:
            thread.join()
    
    if __name__ == '__main__':
        # Perform multiple runs:
        for _ in range(10):
            print(list(stream()))
    

    Output from successive runs:

    [2, 3, 4, 1, 5, 6, 7, 8]
    [1, 2, 3, 4, 5, 6, 7, 8]
    [2, 1, 3, 4, 5, 6, 7, 8]
    [2, 1, 3, 4, 5, 6, 7, 8]
    [2, 1, 3, 4, 5, 6, 7, 8]
    [1, 2, 3, 4, 5, 6, 7, 8]
    [2, 3, 1, 4, 5, 6, 7, 8]
    [1, 3, 2, 4, 5, 6, 7, 8]
    [2, 3, 1, 4, 5, 6, 7, 8]
    [2, 1, 3, 4, 5, 6, 7, 8]