pythonpython-asynciogeneratorcontextmanagerpsycopg3

How to DRY up this psycopg connection pool boilerplate code with a reusable async function or generator?


I'm using psycopg to connect to a PostgreSQL database using a connection pool. It works great, but any function that needs to run SQL in a transaction gets three extra layers of nesting:

/app/db.py

from os import getenv
from psycopg_pool import AsyncConnectionPool

pool = AsyncConnectionPool(getenv('POSTGRES_URL'))

/app/foo.py

from db import pool
from psycopg.rows import dict_row


async def create_foo(**kwargs):
    foo = {}
    async with pool.connection() as conn:
        async with conn.transaction():
            async with conn.cursor(row_factory=dict_row) as cur:
                # use cursor to execute SQL queries
    return foo


async def update_foo(foo_id, **kwargs):
    foo = {}
    async with pool.connection() as conn:
        async with conn.transaction():
            async with conn.cursor(row_factory=dict_row) as cur:
                # use cursor to execute SQL queries
    return foo

I wanted to abstract that away into a helper function, so I tried refactoring it:

/app/db.py

from contextlib import asynccontextmanager
from os import getenv
from psycopg_pool import AsyncConnectionPool

pool = AsyncConnectionPool(getenv('POSTGRES_URL'))


@asynccontextmanager
async def get_tx_cursor(**kwargs):
    async with pool.connection() as conn:
        conn.transaction()
        cur = conn.cursor(**kwargs)
        yield cur

...and calling it like this:

/app/foo.py

from db import get_tx_cursor
from psycopg.rows import dict_row


async def create_foo(**kwargs):
    foo = {}
    async with get_tx_cursor(row_factory=dict_row) as cur:
        # use cursor to execute SQL queries
    return foo

...but that resulted in an error:

TypeError: '_AsyncGeneratorContextManager' object does not support the context manager protocol

I also tried variations of the above, like this:

async def get_tx_cursor(**kwargs):
    async with pool.connection() as conn:
        async with conn.transaction():
            async with conn.cursor(**kwargs) as cur:
                yield cur

...but got similar results, so it appears using a generator is not possible.

Does anyone know of a clean and simple way to expose the cursor to a calling function, without using another library?

Here are the versions I'm using:


Solution

  • Without any extra helper, you might want to write the async with statement in the multiple parameter version - just separate each expression you put in your three with statements in the same statement, separated by commas.

    Note that by using this option, up to Python 3.11 (or maybe 3.10, I don't recall), you can't really add parentheses to enclose the three expressions and reformat your code, as the parser would not be able to distinguish them from a tuple. They'd have to be reformatted by the \ line continuator escape, or simply kept on the same line.

    But, as even this can be inconvenient boilerplate, the simplest way to combine various (fixed) context-managers is to create a context manager as a function, by using the tools provided in the stdlib's contextlib module, as bellow:

         
    ...
    import contextlib
    
    @contextlib.AsyncContextManager
    async def sql(pool):
        # everything in a single `with` statement:
        async with pool.connection() as conn, conn.transaction(), conn.cursor(row_factory=dict_row) as cur:
            yield (conn, cur)
            
    
    async def create_foo(**kwargs):
        foo = {}
        async with sql(pool) as (conn, cur):
            # code block using the SQL conn and cur objects:
            ...
    

    If you have some code where there are optional, or other context managers that could be data driven (for example various files to open simultaneously), them, inside the factored-out context manager, one should use contextlib's ExitStack class (or its counterpart AsyncExitStack). But that is not your case here.