python-3.xasynchronouspython-asynciofastapiasyncpg

asyncpg - cannot perform operation: another operation is in progress


I am attempting to resolve the following error:

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

Here is the full traceback:

Traceback (most recent call last):

  File "<string>", line 1, in <module>
  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               │     │   └ 4
               │     └ 7
               └ <function _main at 0x109c8aca0>
  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/spawn.py", line 129, in _main
    return self._bootstrap(parent_sentinel)
           │    │          └ 4
           │    └ <function BaseProcess._bootstrap at 0x109b1f8b0>
           └ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
    │    └ <function BaseProcess.run at 0x109b18ee0>
    └ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {'config': <uvicorn.config.Config object at 0x109cd55b0>, 'target': <bound method Server.run of <uvicorn.server.Server object...
    │    │        │    │        └ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
    │    │        │    └ ()
    │    │        └ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
    │    └ <function subprocess_started at 0x10a4aca60>
    └ <SpawnProcess name='SpawnProcess-4' parent=36604 started>
  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/uvicorn/subprocess.py", line 61, in subprocess_started
    target(sockets=sockets)
    │              └ [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 80)>]
    └ <bound method Server.run of <uvicorn.server.Server object at 0x109cd56a0>>
  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/uvicorn/server.py", line 48, in run
    loop.run_until_complete(self.serve(sockets=sockets))
    │    │                  │    │             └ [<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 80)>]
    │    │                  │    └ <function Server.serve at 0x10a4abca0>
    │    │                  └ <uvicorn.server.Server object at 0x109cd56a0>
    │    └ <function BaseEventLoop.run_until_complete at 0x10a205820>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
    │    └ <function BaseEventLoop.run_forever at 0x10a205790>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
    │    └ <function BaseEventLoop._run_once at 0x10a209310>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
    handle._run()
    │      └ <function Handle._run at 0x10a13ed30>
    └ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
  File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
    │    │            │    │           │    └ <member '_args' of 'Handle' objects>
    │    │            │    │           └ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
    │    │            │    └ <member '_callback' of 'Handle' objects>
    │    │            └ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>
    │    └ <member '_context' of 'Handle' objects>
    └ <Handle <TaskWakeupMethWrapper object at 0x10bb75a60>(<_GatheringFu...in progress')>)>

> File "./xxx/xxx/xxx.py", line 144, in get_disclosure_data
    hh_json, db_json = await asyncio.gather(*coroutines)
                             │       │       └ [<coroutine object xxxx at 0x10bb2cb40>, <coroutine object db_call at 0x10bb2cc40>]
                             │       └ <function gather at 0x10a1fad30>
                             └ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>

  File "./xxx/xxx/xxx.py", line 52, in db_call
    db_json = await asyncio.gather(*coroutines, loop=asyncio.get_event_loop())
                    │       │       │                │       └ <built-in function get_event_loop>
                    │       │       │                └ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>
                    │       │       └ [<coroutine object DBConnectionManager.fetch_item at 0x10bb434c0>, <coroutine object DBConnectionManager.fetch_item at 0x10bb...
                    │       └ <function gather at 0x10a1fad30>
                    └ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>

  File "./xxx/xxx/xx.py", line 97, in fetch_item
    await self._connection_pool.release(self.con)
          │    │                │       │    └ <PoolConnectionProxy [released] 0x10bbc9cd0>
          │    │                │       └ <chd_api.data.db.DBConnectionManager object at 0x10b946a30>
          │    │                └ <function Pool.release at 0x10b956a60>
          │    └ <asyncpg.pool.Pool object at 0x10bb131e0>
          └ <chd_api.data.db.DBConnectionManager object at 0x10b946a30>

  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 666, in release
    return await asyncio.shield(ch.release(timeout))
                 │       │      │  │       └ None
                 │       │      │  └ <function PoolConnectionHolder.release at 0x10b952e50>
                 │       │      └ <asyncpg.pool.PoolConnectionHolder object at 0x10bb2a5c0>
                 │       └ <function shield at 0x10a1faee0>
                 └ <module 'asyncio' from '/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/asyncio/__init__.py'>
  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 218, in release
    raise ex
  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/pool.py", line 208, in release
    await self._con.reset(timeout=budget)
          │    │                  └ None
          │    └ <member '_con' of 'PoolConnectionHolder' objects>
          └ <asyncpg.pool.PoolConnectionHolder object at 0x10bb2a5c0>
  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/connection.py", line 1311, in reset
    await self.execute(reset_query, timeout=timeout)
          │    │       │                    └ None
          │    │       └ 'SELECT pg_advisory_unlock_all();\nCLOSE ALL;\nUNLISTEN *;\nRESET ALL;'
          │    └ <function Connection.execute at 0x10b93f3a0>
          └ <asyncpg.connection.Connection object at 0x10bc34120>
  File "/Users/ddd/Desktop/repos/xxx/.venv/lib/python3.8/site-packages/asyncpg/connection.py", line 297, in execute
    return await self._protocol.query(query, timeout)
                 │    │               │      └ None
                 │    │               └ 'SELECT pg_advisory_unlock_all();\nCLOSE ALL;\nUNLISTEN *;\nRESET ALL;'
                 │    └ <member '_protocol' of 'Connection' objects>
                 └ <asyncpg.connection.Connection object at 0x10bc34120>
  File "asyncpg/protocol/protocol.pyx", line 321, in query
    self._check_state()
  File "asyncpg/protocol/protocol.pyx", line 684, in asyncpg.protocol.protocol.BaseProtocol._check_state
    raise apg_exc.InterfaceError(
          │       └ <class 'asyncpg.exceptions._base.InterfaceError'>
          └ <module 'asyncpg.exceptions' from '/Users/ddd/Desktop/repos/chd-api/.venv/lib/python3.8/site-packages/asyncpg/exception...

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

where I have the following code to set up a connection pool and execute queries with connections in the pool:

class DBConnectionManager(object):
    """ Class for setting up and tearing down db connection """

    def __init__(self):
        self.host = SETTINGS.db_host
        self.database = SETTINGS.db_name
        self.user = SETTINGS.db_user
        self.password = SETTINGS.db_password
        self.port = "5432"

        self._connection_pool = None
        self.con = None

    async def connect(self):
        if not self._connection_pool:
            try:
                self._connection_pool = await asyncpg.create_pool(
                    host=self.host,
                    database=self.database,
                    user=self.user,
                    password=self.password,
                    port=self.port,
                    min_size=50,
                    max_size=100,
                )
                logger.info("Database pool connection opened")

            except Exception as e:
                logger.exception(e)

    async def fetch_item(self, query: str, *args):
        if not self._connection_pool:
            await self.connect()
        else:
            self.con = await self._connection_pool.acquire()
            try:
                result = await self.con.fetch(query, *args)
                return result
            except Exception as e:
                logger.exception(e)
            finally:
                await self._connection_pool.release(self.con)

    async def close(self):
        if not self._connection_pool:
            try:
                await self._connection_pool.close()
                logger.info("Database pool connection closed")
            except Exception as e:
                logger.exception(e)

and am attempting to execute some 22 database calls using the following:

async def db_call(db, lat, lng):
    """
    Performs the necessary db calls given a lat, lng

    Required Input:
        lat::float a latitude in decimal degrees. Must be specified with `lng` (i.e. 39.2994)
        lng::float a longitude in decimal degrees. Must be specified with `lat` (i.e. -122.33)

    Returns:
        dict
    """
    coroutines = []
    for table in db_map:

        # SQL columns
        db_fields = ",".join(
            [
                f"{col} AS {db_map[table]['fields'][col]}"
                for col in db_map[table]["fields"]
            ]
        )

        # Output names
        api_fields = [db_map[table]["fields"][col] for col in db_map[table]["fields"]]

        if db_map[table]["query_type"] == "pip":
            limit = db_map[table]["options"]["LIMIT"]
            query = f"SELECT {db_fields} from {table} WHERE (ST_Covers(geom, GeomFromEWKT('SRID=4326;POINT({lng} {lat})'))) LIMIT {limit};"

        else:
            distance = db_map[table]["options"]["DISTANCE"]
            geo2geo = f"geom::geography, GeomFromEWKT('SRID=4326;POINT({lng} {lat})')::geography"
            query = (
                f"SELECT {db_fields}, ST_Distance({geo2geo})"
                f"from {table} WHERE (ST_DWithin({geo2geo}, {distance}))"
                f"ORDER BY ST_Distance({geo2geo}) LIMIT 1;"
            )

        coroutines.append(db.fetch_item(query))

    db_res = await asyncio.gather(*coroutines)
    
    .... code for processing results

I have examined several issues on the asyncpg github concerning this error and am still not finding an appropriate solution. Note also, this call is being performed in FastAPI.

Why this error may be occurring and how to resolve it?


Solution

  • The assignment to self.con in fetch_item causes multiple coroutines to share the same connection. While you do want them to share the connection pool, sharing the same connection doesn't make sense, as the connection is stateful.

    To resolve the issue, replace usage of self.con with a local variable con.