pythonasynchronousconcurrencypython-asyncioaiohttp

Fetch data concurrently within for loop using asnycio


I would like to optimize my current function called process_coordinates.

import asyncio
from aiohttp import ClientSession
from tqdm.asyncio import tqdm
import pandas as pd
from streetlevel import streetview


data_dict = {
    1: {'country': 'AZE', 'longitude': 48.84328388521507, 'latitude': 39.75349231850633},
    2: {'country': 'AZE', 'longitude': 46.42568983461019, 'latitude': 41.74686028542068},
    3: {'country': 'AZE', 'longitude': 46.77391084838791, 'latitude': 41.05880534123746},
    4: {'country': 'AZE', 'longitude': 46.57032078287734, 'latitude': 39.312871420751485},
    5: {'country': 'AZE', 'longitude': 47.26319069021316, 'latitude': 41.28950907274436},
    6: {'country': 'AZE', 'longitude': 46.49956247696345, 'latitude': 40.96062005058899},
    7: {'country': 'AZE', 'longitude': 48.291171317357815, 'latitude': 38.939108897065445},
    8: {'country': 'AZE', 'longitude': 47.12081533506723, 'latitude': 39.681052295694656},
}


async def process_coordinates(data_dict):
    pano_dict = {}

    async with ClientSession() as session:
        for k, v in tqdm(data_dict.items(), desc="Parsing coordinates", unit="coordinate"):
            pano = await streetview.find_panorama_async(v['latitude'], v['longitude'], session)

            if pano is not None:
                pano_dict[k] = {
                    'longitude': pano.lon,
                    'latitude': pano.lat,
                    'date': str(pano.date)
                }
            else:
                pano_dict[k] = {
                    'longitude': None,
                    'latitude': None,
                    'date': None
                }

    return pano_dict


async def main(data_dict):
    pano_results = await process_coordinates(data_dict)

    df = pd.DataFrame.from_dict(pano_results, orient='index')

    df.reset_index(inplace=True)
    df.rename(columns={'index': 'uuid'}, inplace=True)

    df.to_parquet('results.parquet', index=False)



asyncio.run(main(data_dict))

Right now, it is capable of iterating over one coordinate per iteration and request information for the given coordinate via find_panorama_async. I would like to speed things up, and request information for four coordinates in parallel. The response is stored as a dictionary.


Solution

  • await is waiting for result from executed function and it doesn't allow to start other tasks at the same time.

    One of method is to start task with create_task() without await and later use await to get result.

    This allows to start many tasks and keep them on list, and later use await on every element on list to get results.

    async def process_coordinates(data_dict):
        pano_dict = {}
    
        async with ClientSession() as session:
            all_tasks = []
    
            # start all tasks
    
            for k, v in tqdm(data_dict.items(), desc="Parsing coordinates", unit="coordinate"):
                # pano = await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
                task = asyncio.create_task(streetview.find_panorama_async(v['latitude'], v['longitude'], session))
                all_tasks.append(task)
    
            # wait for results
    
            for k, task in enumerate(all_tasks):
                pano = await task
                if pano is not None:
                    pano_dict[k] = {
                        'longitude': pano.lon,
                        'latitude': pano.lat,
                        'date': str(pano.date)
                    }
                else:
                    pano_dict[k] = {
                        'longitude': None,
                        'latitude': None,
                        'date': None
                    }
    
        return pano_dict
    

    Original code shows me tqdm with 27.03coordinate/s,
    and this version shows me 128561.04coordinate/s.
    But it doesn't count how long takes all code.


    You can also create all "tasks" without create_task and with await and it will not start them,
    and later you can use gather(task1, task2, ...) to start them all at once and finally it gives list of results with the same order.

    async def process_coordinates(data_dict):
        pano_dict = {}
    
        async with ClientSession() as session:
            all_tasks = []
    
            # create all tasks but not run them
    
            for k, v in tqdm(data_dict.items(), desc="Parsing coordinates", unit="coordinate"):
                # pano = await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
                task = streetview.find_panorama_async(v['latitude'], v['longitude'], session)
                all_tasks.append(task)
    
            # start all tasks and wait for results
    
            all_results = await asyncio.gather(*all_tasks)  # needs * to unpack list
            #print(all_results)
    
            # get results
    
            for k, pano in enumerate(all_results):
                #pano = await task  # no need await
                if pano is not None:
                    pano_dict[k] = {
                        'longitude': pano.lon,
                        'latitude': pano.lat,
                        'date': str(pano.date)
                    }
                else:
                    pano_dict[k] = {
                        'longitude': None,
                        'latitude': None,
                        'date': None
                    }
    
        return pano_dict
    

    Sometimes it shows me even 219310.01coordinate/s.
    But it doesn't count how long takes all code.


    There is also as_completed() which gives results in different order - in order of finished tasks.


    EDIT:

    Version with Semaphore to limit number of tasks at the same time.

    For tests I added sleep() because normally it runs too fast and hard to see if it works.

    async def my_task(k, v, session, sem):
        async with sem:
            print(f'running {k}')   # only for tests
            await asyncio.sleep(3)  # only for tests
            return await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
    
    async def process_coordinates(data_dict):
        pano_dict = {}
    
        async with ClientSession() as session:
            all_tasks = []
    
            # create all tasks but not run them
    
            sem = asyncio.Semaphore(3)
    
            for k, v in tqdm(data_dict.items(), desc="Parsing coordinates", unit="coordinate"):
                # pano = await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
                #task = streetview.find_panorama_async(v['latitude'], v['longitude'], session)
                task = my_task(k, v, session, sem)
                all_tasks.append(task)
    
            # start all tasks and wait for results
    
            all_results = await asyncio.gather(*all_tasks)  # needs * to unpack list
            #print(all_results)
    
            # get results
    
            for k, pano in enumerate(all_results):
                #pano = await task  # no need await
                if pano is not None:
                    pano_dict[k] = {
                        'longitude': pano.lon,
                        'latitude': pano.lat,
                        'date': str(pano.date)
                    }
                else:
                    pano_dict[k] = {
                        'longitude': None,
                        'latitude': None,
                        'date': None
                    }
    
        return pano_dict
    

    Version with universal function with_sem(semaphore, name, task) which can run with any task.

    async def with_sem(sem, name, task):
        async with sem:
            print(f'running {name}')   # only for tests
            await asyncio.sleep(2)  # only for tests
            return await task
    
    async def process_coordinates(data_dict):
        pano_dict = {}
    
        async with ClientSession() as session:
            all_tasks = []
    
            # create all tasks but not run them
    
            sem = asyncio.Semaphore(3)
    
            for k, v in tqdm(data_dict.items(), desc="Parsing coordinates", unit="coordinate"):
                # pano = await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
                #task = streetview.find_panorama_async(v['latitude'], v['longitude'], session)           
                task = with_sem(sem, k, streetview.find_panorama_async(v['latitude'], v['longitude'], session))
                all_tasks.append(task)
    
            # start all tasks and wait for results
    
            all_results = await asyncio.gather(*all_tasks)  # needs * to unpack list
            #print(all_results)
    
            # get results
    
            for k, pano in enumerate(all_results):
                #pano = await task  # no need await
                if pano is not None:
                    pano_dict[k] = {
                        'longitude': pano.lon,
                        'latitude': pano.lat,
                        'date': str(pano.date)
                    }
                else:
                    pano_dict[k] = {
                        'longitude': None,
                        'latitude': None,
                        'date': None
                    }
    
                print(k, pano_dict[k])
    
        return pano_dict