pythonpython-asynciopsycopg3

how to run multiple queries using psycopg AsyncConnectionPool


I would like to run 5 queries asynchronously using the psycopg_pool. I am not entirely familiar with asyncio and running into an issue. I am hoping someone would be helpful in explaining and debuging my code below as I am not sure what I am doing wrong.

import psycopg
import psycopg_pool
from psycopg.rows import dict_row

class AsyncDBPool:
    def __init__(self, pool_size:int):
        self.pool = psycopg_pool.AsyncConnectionPool(
            conninfo="host={} port={} dbname={} user={} password={}" \
                .format(HOSTNAME, PORT, DBNAME, USERNAME, PASSWORD),
            min_size= 1,
            max_size= pool_size,
            timeout=30,
            open=False
        )
    
    async def open_pool(self):
        await self.pool.open()
        await self.pool.wait()

    async def execute(self, query:str, args:tuple=()):
        async with self.pool.connection() as conn:
            async with conn.cursor(row_factory=dict_row) as cur:
                await cur.execute(query, args)
                await conn.commit()
    
    async def execute_all(self, queries:list):
        assert isinstance(queries, list), "must be list"
        asyncio.gather(*(self.execute(qa['query'], qa['args']) for qa in queries), return_exceptions=True)

        
if __name__ == "__main__":
    queries = [
        {
            "query": "insert into test (entry) values (%s);", 
            "args": n 
        } for n in range(10)
    ]
    asyncio.run(AsyncDBPool(5).execute_all(queries))

I am not getting any errors but the queries are not committing...


Solution

  • Update: I was able to figure it out, I did not open the pool within the execute_all function and did not await the gathering of tasks. If someone could explain why it didn't work for first time, that would be great :)

    Below is the working code:

    class AsyncDBPool:
        def __init__(self, min_conn:int):
            self.pool = psycopg_pool.AsyncConnectionPool(
                conninfo="host={} port={} dbname={} user={} password={}" \
                    .format(POSTGRES_HOSTNAME, POSTGRES_PORT, POSTGRES_DBNAME, POSTGRES_USERNAME, POSTGRES_PASSWORD),
                min_size= 5,
                max_size= MAX_POOL_SIZE,
                timeout=30,
                max_idle= 300,
                open=False
            )
        
        async def open_pool(self):
            await self.pool.open()
            await self.pool.wait()
            print("Connection to database established")
    
        async def execute(self, query:str, args:tuple=()):
            async with self.pool.connection() as conn:
                async with conn.cursor(row_factory=dict_row) as cur:
                    await cur.execute(query, (args,))
                    await conn.commit()
    
        async def execute_all(self, queries:list):
            #validation
            assert isinstance(queries, list), "queries must be a list"
            assert len(queries), "queries cannot be empty"
            assert all(isinstance(q, dict) for q in queries), "queries must contain dict type values"
            assert all("query" in q and "args" in q for q in queries), "queries objects must contain 'query' and 'args'"
    
            # open pool, gather/execute tasks, close pool
            await self.open_pool()
            await asyncio.gather(*(self.execute(q['query'], q['args']) for q in queries), return_exceptions=True)
            await self.pool.close()
            
    if __name__ == "__main__":
        queries = [
            {
                "query": "insert into test (entry) values (%s);", 
                "args": n
            } for n in range(10)
        ]
        asyncio.run(AsyncDBPool(5).execute_all(queries))