I'm using PostgreSQL & asyncpg.
class PgDb:
# noinspection SpellCheckingInspection
def __init__(self, conn: asyncpg.connection.Connection):
self.conn = conn
async def select(self, sql: str, args: Union[list, Dict[str, Any]] = []) -> List[Dict[str, Any]]:
sql, _args = self.__convert_placeholders(sql, args)
return [dict(row) for row in await self.conn.fetch(sql, *_args)]
class DbPoolSingleton:
db_pool: Optional[asyncpg.pool.Pool] = None
@staticmethod
async def create_pool():
config = get_postgres_config()
pool: asyncpg.Pool = await asyncpg.create_pool(
...,
min_size=30,
max_size=40
)
print("Pool created")
return pool
@staticmethod
async def get_pool() -> asyncpg.pool.Pool:
if not DbPoolSingleton.db_pool:
DbPoolSingleton.db_pool = await DbPoolSingleton.create_pool()
return DbPoolSingleton.db_pool
@staticmethod
async def terminate_pool():
(await DbPoolSingleton.get_pool()).terminate()
DbPoolSingleton.db_pool = None
print("Pool terminated")
import asyncio
from helpers.pg_rdb_helper import DbPoolSingleton, PgDb
async def test_synchronous():
conn = await (await DbPoolSingleton.get_pool()).acquire()
db = PgDb(conn)
sql = """samplesql"""
total_start = start = datetime.datetime.now()
for i in range(20):
start = datetime.datetime.now()
rows = await db.select(sql)
end = datetime.datetime.now()
print(f"{i}st query took: ", (end-start).total_seconds())
total_end = datetime.datetime.now()
print(f"total query took: ", (total_end-total_start).total_seconds())
=> total query took: 2.131297
async def test_asynchronous():
db_pool = await DbPoolSingleton.get_pool()
sql = """samplesql"""
total_start = datetime.datetime.now()
tasks = []
for i in range(20):
db = PgDb(await db_pool.acquire())
task = asyncio.create_task(db.select(sql))
tasks.append(task)
await asyncio.gather(*tasks)
total_end = datetime.datetime.now()
print(f"total query took: ", (total_end-total_start).total_seconds())
===> total query took: 2.721282
Here, I have a function which is simple multiple queries call, the first version is synchronous version which await every single query without using asyncio
, the second one is using asyncio.gather
to run these query in background (at least this is my assumption).
Then turn out, as you saw the result asynchronous version
was completely slower than synchronous version
. Basically I know in asynchronous version
we have some overhead for getting connection from pool for every single query which caused it a bit slower.
So how could we fix asynchronous version
to take advandtage of asyncpg
and asyncio
.
After I investigate, I have some fix for this asynchronous version
but bot of them got some error.
Asynchronous fix 1
async def test_asynchronous():
db_pool = await DbPoolSingleton.get_pool()
sql = """samplesql"""
total_start = datetime.datetime.now()
tasks = []
async with db_pool.acquire() as conn:
db = PgDb(conn)
for i in range(20):
task = asyncio.create_task(db.select(sql))
tasks.append(task)
await asyncio.gather(*tasks)
total_end = datetime.datetime.now()
print(f"total query took: ", (total_end-total_start).total_seconds())
I got this error ===>
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
Now, I gave up with this problem, please help me to resolve it??
My question: So how could we fix asynchronous version
to take advandtage of asyncpg
and asyncio
.
You are attempting to use the same connection from the pool in all of the tasks you create in your async function.
Try this working example where the tasks each acquire its own connection from the pool:
from typing import Optional
import asyncio, datetime
import asyncpg
TRIALS = 200
SQL = "select count(*) from users"
class DbPoolSingleton:
db_pool: Optional[asyncpg.pool.Pool] = None
@staticmethod
async def create_pool():
pool: asyncpg.Pool = await asyncpg.create_pool(
min_size=1,
max_size=10
)
print("Pool created")
return pool
@staticmethod
async def get_pool() -> asyncpg.pool.Pool:
if not DbPoolSingleton.db_pool:
DbPoolSingleton.db_pool = await DbPoolSingleton.create_pool()
return DbPoolSingleton.db_pool
@staticmethod
async def terminate_pool():
(await DbPoolSingleton.get_pool()).terminate()
DbPoolSingleton.db_pool = None
print("Pool terminated")
async def test_synchronous() -> None:
conn = await(await DbPoolSingleton.get_pool()).acquire()
total_start = start = datetime.datetime.now()
runtime_total = 0.0
for i in range(TRIALS):
start = datetime.datetime.now()
result = await conn.fetchval(SQL)
end = datetime.datetime.now()
qruntime = (end - start).total_seconds()
runtime_total += qruntime
#print(f"{i}st query took: {qruntime} to get {result}")
total_end = datetime.datetime.now()
print(f"sync total query took: {(total_end-total_start).total_seconds()} wallclock seconds and {runtime_total} run seconds")
async def run_query(pool, sql) -> tuple:
async with pool.acquire() as conn:
qstart = datetime.datetime.now()
result = await conn.fetchval(sql)
qend = datetime.datetime.now()
return (result, (qend - qstart).total_seconds())
async def test_asynchronous() -> None:
pool = await DbPoolSingleton.get_pool()
total_start = datetime.datetime.now()
#runtime_total = 0.0
tasks = []
for i in range(TRIALS):
tasks.append(run_query(pool, SQL))
results = await asyncio.gather(*tasks)
runtime_total = sum([t[1] for t in results])
total_end = datetime.datetime.now()
print(f"async total query took: {(total_end-total_start).total_seconds()} wallclock seconds and {runtime_total} run seconds")
# print(f"total query took: {(total_end-total_start).total_seconds()} wallclock seconds")
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(test_synchronous())
loop.run_until_complete(test_asynchronous())
loop.run_until_complete(test_synchronous())
loop.run_until_complete(test_asynchronous())