pythonpython-asynciofaust

Future inside future always pending


P.S. Started an issue https://github.com/robinhood/faust/issues/702

Developing Faust-app:

from concurrent.futures import ProcessPoolExecutor, as_completed

import faust

app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic')


@app.task()
async def check():
    # 3 is amount of different folders where archives are laced
    with ProcessPoolExecutor(max_workers=3) as executor:
        fs = [executor.submit(handle, directory) for directory in ['dir1', 'dir2', 'dir3']]
        for future in as_completed(fs):
            future.result()


def handle(directory):
    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka
    f = sink.send_soon(value={'ts': 1234567890, 'count': 10})  # always in pending status

Faced a problem when method sink.send_soon returns FutureMessage(asyncio.Future, Awaitable[RecordMetadata]) which is always in pending status.

This is the situation when future inside another future.

Note. Function handle should be sync because one cannot pass async function to ProcessPollExecutor. Method send_soon is sync method. According to this example https://github.com/robinhood/faust/blob/b5e159f1d104ad4a6aa674d14b6ba0be19b5f9f5/examples/windowed_aggregation.py#L47 awaiting is not necessarily.

If there any way to handle pending future?

Also tried this:

import asyncio
from concurrent.futures import ProcessPoolExecutor

import faust

loop = asyncio.get_event_loop()

app = faust.App('my-app-name', broker='kafka://localhost:9092', loop=loop)
sink = app.topic('topic')


@app.task()
async def check():
    tasks = []
    with ProcessPoolExecutor(max_workers=3) as executor:
        for dir_ in ['dir1', 'dir2', 'dir3']:
            task = asyncio.create_task(run_dir_handling(executor, dir_))
            tasks.append(task)

        await asyncio.gather(*tasks)


async def run_dir_handling(executor, dir_):
    print('running blocking')
    await loop.run_in_executor(executor, handle, dir_)


def handle(directory):
    print('Handle directory')

    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka

    # `send_soon` is not non-`async def but `send` is async
    # async `soon` cannot be implemented because of
    #    `await loop.run_in_executor(executor, handle, dir_) TypeError: cannot pickle 'coroutine' object` error
    f = sink.send_soon(value={'ts': 1234567890, 'count': 10, 'dir': directory})
    print(f)  # always <FutureMessage pending>

But it didn't work too.

It seems loop is not even have a chance to run send_soon method.


Solution

  • Changed code structure for this:

    import asyncio
    from concurrent.futures import ProcessPoolExecutor
    
    import faust
    
    loop = asyncio.get_event_loop()
    
    app = faust.App('my-app-name', broker='kafka://localhost:9092')
    sink = app.topic('topic1')
    
    
    @app.task()
    async def check():
        tasks = []
    
        with ProcessPoolExecutor(max_workers=3) as executor:
            for dir_ in ['dir1', 'dir2', 'dir3']:
                task = asyncio.create_task(run_dir_handling(executor, dir_))
                tasks.append(task)
    
            await asyncio.gather(*tasks)
    
    
    async def run_dir_handling(executor, dir_):
        directory = await loop.run_in_executor(executor, handle, dir_)
        await sink.send(value={'dir': directory})  
        
    
    def handle(directory):
        print('Handle directory')
    
        # finding archives in directory
        # unpacking 7z with mdb-files
        # converting mdb tables to csv
        # reading csv to dataframe
        # some data manipulating
        # and at last sending dataframe records to kafka
    
        return directory