I would like to run 5 queries asynchronously using the psycopg_pool. I am not entirely familiar with asyncio and running into an issue. I am hoping someone would be helpful in explaining and debuging my code below as I am not sure what I am doing wrong.
import psycopg
import psycopg_pool
from psycopg.rows import dict_row
class AsyncDBPool:
def __init__(self, pool_size:int):
self.pool = psycopg_pool.AsyncConnectionPool(
conninfo="host={} port={} dbname={} user={} password={}" \
.format(HOSTNAME, PORT, DBNAME, USERNAME, PASSWORD),
min_size= 1,
max_size= pool_size,
timeout=30,
open=False
)
async def open_pool(self):
await self.pool.open()
await self.pool.wait()
async def execute(self, query:str, args:tuple=()):
async with self.pool.connection() as conn:
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(query, args)
await conn.commit()
async def execute_all(self, queries:list):
assert isinstance(queries, list), "must be list"
asyncio.gather(*(self.execute(qa['query'], qa['args']) for qa in queries), return_exceptions=True)
if __name__ == "__main__":
queries = [
{
"query": "insert into test (entry) values (%s);",
"args": n
} for n in range(10)
]
asyncio.run(AsyncDBPool(5).execute_all(queries))
I am not getting any errors but the queries are not committing...
Update: I was able to figure it out, I did not open the pool within the execute_all function and did not await the gathering of tasks. If someone could explain why it didn't work for first time, that would be great :)
Below is the working code:
class AsyncDBPool:
def __init__(self, min_conn:int):
self.pool = psycopg_pool.AsyncConnectionPool(
conninfo="host={} port={} dbname={} user={} password={}" \
.format(POSTGRES_HOSTNAME, POSTGRES_PORT, POSTGRES_DBNAME, POSTGRES_USERNAME, POSTGRES_PASSWORD),
min_size= 5,
max_size= MAX_POOL_SIZE,
timeout=30,
max_idle= 300,
open=False
)
async def open_pool(self):
await self.pool.open()
await self.pool.wait()
print("Connection to database established")
async def execute(self, query:str, args:tuple=()):
async with self.pool.connection() as conn:
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(query, (args,))
await conn.commit()
async def execute_all(self, queries:list):
#validation
assert isinstance(queries, list), "queries must be a list"
assert len(queries), "queries cannot be empty"
assert all(isinstance(q, dict) for q in queries), "queries must contain dict type values"
assert all("query" in q and "args" in q for q in queries), "queries objects must contain 'query' and 'args'"
# open pool, gather/execute tasks, close pool
await self.open_pool()
await asyncio.gather(*(self.execute(q['query'], q['args']) for q in queries), return_exceptions=True)
await self.pool.close()
if __name__ == "__main__":
queries = [
{
"query": "insert into test (entry) values (%s);",
"args": n
} for n in range(10)
]
asyncio.run(AsyncDBPool(5).execute_all(queries))