sqlalchemypython-asyncio

SQLAlchemy: method '_connection_for_bind()' is already in progress


I recently updated SQLAlchemy (with [asyncio] package) to 1.4.46 and started to get the following exception when committing:

sqlalchemy.exc.IllegalStateChangeError: Method 'commit()' can't be called here; method '_connection_for_bind()' is already in progress and this would cause an unexpected state change to <SessionTransactionState.CLOSED: 5>

Before updating to the new version, it was working fine.

# -*- coding:utf-8 -*-

from sqlalchemy import exc, event, text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.pool import NullPool, Pool
from contextvars import ContextVar
from sanic import Sanic
import asyncio


class EngineNotInitialisedError(Exception):
    pass


class DBSessionContext:
    def __init__(self, session: Session, commit_on_exit: bool = True) -> None:
        self.session = session
        self._query = None
        self.commit_on_exit = commit_on_exit
        self.token = None

    async def close(self, exc_type=None, exc_value=None, traceback=None):
        if self._query:
            if exc_value and getattr(exc_value, 'status_code', 500) > 300:
                await self._query.rollback()
                self._post_processing.clear()
            else:
                await self._query.commit()
                await self.run_post_processing()

            await self._query.close()

        if self._post_processing:
            await self.run_post_processing()

    def set_token(self, token):
        self.token = token

    @property
    def query(self) -> Session:
        if not self._query:
            self._query = self.session()

        return self._query


class AsyncSession(SQLAlchemyAsyncSession):
    async def execute(self, statement, **parameters):
        try:
            if isinstance(statement, str):
                # We wrap around the `text()` method automatically
                statement = text(statement)
            return await super().execute(statement, parameters)
        except exc.OperationalError as e:
            if e.orig.args[0] == 1205:
                # Lock wait timeout exceeded
                await self.rollback()
                return await super().execute(statement, parameters)

            raise e


class DBSession:
    def __init__(self):
        self.engine = None
        self.session = None
        self._session = None
        self.context = ContextVar("context", default=None)

    def init_app(self, app: Sanic, url: str, commit_on_exit: bool = True) -> None:
        self.commit_on_exit = commit_on_exit

        engine_args = {
            'echo': app.config.get('DATABASE_ECHO', cast=bool, default=False),
            'echo_pool': app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
            'poolclass': NullPool,  # will be used to create a connection pool instance using the connection parameters given in the URL
            # if pool_class is not NullPool:

            # the number of connections to allow in connection pool “overflow”
            # 'max_overflow': app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
            # if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
            # 'pool_pre_ping': app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
            # the number of connections to keep open inside the connection pool
            # 'pool_size': app.config.get('DATABASE_POOL_SIZE', cast=int, default=5),
            # this setting causes the pool to recycle connections after the given number of seconds has passed
            # 'pool_recycle': app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=-1),
            # number of seconds to wait before giving up on getting a connection from the pool
            # 'pool_timeout': app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=3600),
        }

        self.engine = create_async_engine(
            url,
            **engine_args
        )

        self.session = sessionmaker(
            bind=self.engine,
            expire_on_commit=False,
            class_=AsyncSession,
            autoflush=False
        )

    async def __aenter__(self):
        if not isinstance(self.engine, AsyncEngine):
            raise EngineNotInitialisedError

        session_ctx = DBSessionContext(self.session, self.commit_on_exit)
        session_ctx.set_token(self.context.set(session_ctx))

        return session_ctx

    async def __aexit__(self, exc_type, exc_value, traceback):
        session_ctx = self.context.get()
        await asyncio.shield(session_ctx.close(exc_type, exc_value, traceback))

        self.context.reset(session_ctx.token)

    @property
    def query(self) -> Session:
        return self.context.get().query


@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
    '''Listener for Pool checkout events that pings every connection before using.
    Implements pessimistic disconnect handling strategy. See also:
    http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''

    cursor = dbapi_con.cursor()
    try:
        cursor.execute("SELECT 1")
    except exc.OperationalError as ex:
        if ex.args[0] in (2006,   # MySQL server has gone away
                          2013,   # Lost connection to MySQL server during query
                          2055):  # Lost connection to MySQL server at '%s', system error: %d
            raise exc.DisconnectionError()  # caught by pool, which will retry with a new connection
        else:
            raise

    cursor.close()


db = DBSession()

The code is called with the following :

async with db:
    await db.query.execute('INSERT INTO ...')

What is causing the InvalidStateChangeError I'm having? How can I avoid this issue?


Solution

  • There is a discussion on the Github repository of SQLAlchemy, that gives a reason why the issue is occurring: https://github.com/sqlalchemy/sqlalchemy/discussions/9312

    The suggestion is that the code is calling something like

    asyncio.gather(func(session), func2(session) with the two function sharing the same session, which causes the sqlalchemy.exc.IllegalStateChangeError

    Removing the asyncio.gather call resolve the issue. (Or use two sessions, one for each functions).