I would like to optimize my current function called process_coordinates.
import asyncio
from aiohttp import ClientSession
from tqdm.asyncio import tqdm
import pandas as pd
from streetlevel import streetview
data_dict = {
1: {'country': 'AZE', 'longitude': 48.84328388521507, 'latitude': 39.75349231850633},
2: {'country': 'AZE', 'longitude': 46.42568983461019, 'latitude': 41.74686028542068},
3: {'country': 'AZE', 'longitude': 46.77391084838791, 'latitude': 41.05880534123746},
4: {'country': 'AZE', 'longitude': 46.57032078287734, 'latitude': 39.312871420751485},
5: {'country': 'AZE', 'longitude': 47.26319069021316, 'latitude': 41.28950907274436},
6: {'country': 'AZE', 'longitude': 46.49956247696345, 'latitude': 40.96062005058899},
7: {'country': 'AZE', 'longitude': 48.291171317357815, 'latitude': 38.939108897065445},
8: {'country': 'AZE', 'longitude': 47.12081533506723, 'latitude': 39.681052295694656},
}
async def process_coordinates(data_dict):
pano_dict = {}
async with ClientSession() as session:
for k, v in tqdm(data_dict.items(), desc="Parsing coordinates", unit="coordinate"):
pano = await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
if pano is not None:
pano_dict[k] = {
'longitude': pano.lon,
'latitude': pano.lat,
'date': str(pano.date)
}
else:
pano_dict[k] = {
'longitude': None,
'latitude': None,
'date': None
}
return pano_dict
async def main(data_dict):
pano_results = await process_coordinates(data_dict)
df = pd.DataFrame.from_dict(pano_results, orient='index')
df.reset_index(inplace=True)
df.rename(columns={'index': 'uuid'}, inplace=True)
df.to_parquet('results.parquet', index=False)
asyncio.run(main(data_dict))
Right now, it is capable of iterating over one coordinate per iteration and request information for the given coordinate via find_panorama_async
. I would like to speed things up, and request information for four coordinates in parallel. The response is stored as a dictionary.
await
is waiting for result from executed function and it doesn't allow to start other tasks at the same time.
One of method is to start task with create_task() without await
and later use await
to get result.
This allows to start many tasks and keep them on list, and later use await
on every element on list to get results.
async def process_coordinates(data_dict):
pano_dict = {}
async with ClientSession() as session:
all_tasks = []
# start all tasks
for k, v in tqdm(data_dict.items(), desc="Parsing coordinates", unit="coordinate"):
# pano = await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
task = asyncio.create_task(streetview.find_panorama_async(v['latitude'], v['longitude'], session))
all_tasks.append(task)
# wait for results
for k, task in enumerate(all_tasks):
pano = await task
if pano is not None:
pano_dict[k] = {
'longitude': pano.lon,
'latitude': pano.lat,
'date': str(pano.date)
}
else:
pano_dict[k] = {
'longitude': None,
'latitude': None,
'date': None
}
return pano_dict
Original code shows me tqdm
with 27.03coordinate/s
,
and this version shows me 128561.04coordinate/s
.
But it doesn't count how long takes all code.
You can also create all "tasks" without create_task
and with await
and it will not start them,
and later you can use gather(task1, task2, ...) to start them all at once and finally it gives list of results with the same order.
async def process_coordinates(data_dict):
pano_dict = {}
async with ClientSession() as session:
all_tasks = []
# create all tasks but not run them
for k, v in tqdm(data_dict.items(), desc="Parsing coordinates", unit="coordinate"):
# pano = await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
task = streetview.find_panorama_async(v['latitude'], v['longitude'], session)
all_tasks.append(task)
# start all tasks and wait for results
all_results = await asyncio.gather(*all_tasks) # needs * to unpack list
#print(all_results)
# get results
for k, pano in enumerate(all_results):
#pano = await task # no need await
if pano is not None:
pano_dict[k] = {
'longitude': pano.lon,
'latitude': pano.lat,
'date': str(pano.date)
}
else:
pano_dict[k] = {
'longitude': None,
'latitude': None,
'date': None
}
return pano_dict
Sometimes it shows me even 219310.01coordinate/s
.
But it doesn't count how long takes all code.
There is also as_completed() which gives results in different order - in order of finished tasks.
EDIT:
Version with Semaphore
to limit number of tasks at the same time.
For tests I added sleep()
because normally it runs too fast and hard to see if it works.
async def my_task(k, v, session, sem):
async with sem:
print(f'running {k}') # only for tests
await asyncio.sleep(3) # only for tests
return await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
async def process_coordinates(data_dict):
pano_dict = {}
async with ClientSession() as session:
all_tasks = []
# create all tasks but not run them
sem = asyncio.Semaphore(3)
for k, v in tqdm(data_dict.items(), desc="Parsing coordinates", unit="coordinate"):
# pano = await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
#task = streetview.find_panorama_async(v['latitude'], v['longitude'], session)
task = my_task(k, v, session, sem)
all_tasks.append(task)
# start all tasks and wait for results
all_results = await asyncio.gather(*all_tasks) # needs * to unpack list
#print(all_results)
# get results
for k, pano in enumerate(all_results):
#pano = await task # no need await
if pano is not None:
pano_dict[k] = {
'longitude': pano.lon,
'latitude': pano.lat,
'date': str(pano.date)
}
else:
pano_dict[k] = {
'longitude': None,
'latitude': None,
'date': None
}
return pano_dict
Version with universal function with_sem(semaphore, name, task)
which can run with any task.
async def with_sem(sem, name, task):
async with sem:
print(f'running {name}') # only for tests
await asyncio.sleep(2) # only for tests
return await task
async def process_coordinates(data_dict):
pano_dict = {}
async with ClientSession() as session:
all_tasks = []
# create all tasks but not run them
sem = asyncio.Semaphore(3)
for k, v in tqdm(data_dict.items(), desc="Parsing coordinates", unit="coordinate"):
# pano = await streetview.find_panorama_async(v['latitude'], v['longitude'], session)
#task = streetview.find_panorama_async(v['latitude'], v['longitude'], session)
task = with_sem(sem, k, streetview.find_panorama_async(v['latitude'], v['longitude'], session))
all_tasks.append(task)
# start all tasks and wait for results
all_results = await asyncio.gather(*all_tasks) # needs * to unpack list
#print(all_results)
# get results
for k, pano in enumerate(all_results):
#pano = await task # no need await
if pano is not None:
pano_dict[k] = {
'longitude': pano.lon,
'latitude': pano.lat,
'date': str(pano.date)
}
else:
pano_dict[k] = {
'longitude': None,
'latitude': None,
'date': None
}
print(k, pano_dict[k])
return pano_dict