pythonpython-asynciomysql-connector

When using mysql.connector.aio how do we enable connection pooling (assuming it is needed)?


I am trying to port my old mysql connector code to use the asyncio libraries provided by MySQL. When I tried to run it, it said it didn't recognize the pool_name and pool_size.

It didn't explicitly state in the documentation that pooling is not supported. AIOMysql does support pooling.

But I was also thinking, if I am running on a single thread, why would I need connection pooling? Maybe that's why it isn't explicitly supported by the MySQL AIO drivers?

There's a forum question but doesn't really address whether connection pooling is needed or not.

https://stackoverflow.com/a/66222924/242042 seems to indicate that connection pooling isn't worth it, but it could be specific to AIOMysql.


Solution

  • First, as regards to whether mysql.connector.aio supports connection pooling or not, the following is a portion of that package's connect function (Python 3.12.2):

    async def connect(*args: Any, **kwargs: Any) -> MySQLConnectionAbstract:
        """Creates or gets a MySQL connection object.
    
        In its simpliest form, `connect()` will open a connection to a
        MySQL server and return a `MySQLConnectionAbstract` subclass
        object such as `MySQLConnection` or `CMySQLConnection`.
    
        When any connection pooling arguments are given, for example `pool_name`
        or `pool_size`, a pool is created or a previously one is used to return
        a `PooledMySQLConnection`.
    
        Args:
            *args: N/A.
            **kwargs: For a complete list of possible arguments, see [1]. If no arguments
                      are given, it uses the already configured or default values.
    
        Returns:
            A `MySQLConnectionAbstract` subclass instance (such as `MySQLConnection` or
            a `CMySQLConnection`) instance.
    
        Examples:
            A connection with the MySQL server can be established using either the
            `mysql.connector.connect()` method or a `MySQLConnectionAbstract` subclass:
            ```
            >>> from mysql.connector.aio import MySQLConnection, HAVE_CEXT
            >>>
            >>> cnx1 = await mysql.connector.aio.connect(user='joe', database='test')
            >>> cnx2 = MySQLConnection(user='joe', database='test')
            >>> await cnx2.connect()
            >>>
            >>> cnx3 = None
            >>> if HAVE_CEXT:
            >>>     from mysql.connector.aio import CMySQLConnection
            >>>     cnx3 = CMySQLConnection(user='joe', database='test')
            ```
    
        References:
            [1]: https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
        """
    

    The relevant comment is:

    When any connection pooling arguments are given, for example pool_name or pool_size, a pool is created or a previously one is used to return a PooledMySQLConnection.

    Note that pool_name is explicitly mentioned. Yet:

    >>> import asyncio
    >>> async def test():
    ...     from mysql.connector.aio import connect
    ...     conn = await connect(pool_name = 'some_name')
    ...
    >>> asyncio.run(test())
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "C:\Python312\Lib\asyncio\runners.py", line 194, in run
        return runner.run(main)
               ^^^^^^^^^^^^^^^^
      File "C:\Python312\Lib\asyncio\runners.py", line 118, in run
        return self._loop.run_until_complete(task)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "C:\Python312\Lib\asyncio\base_events.py", line 685, in run_until_complete
        return future.result()
               ^^^^^^^^^^^^^^^
      File "<stdin>", line 3, in test
      File "C:\Python312\Lib\site-packages\mysql\connector\aio\__init__.py", line 162, in connect
        cnx = MySQLConnection(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    TypeError: MySQLConnectionAbstract.__init__() got an unexpected keyword argument 'pool_name'
    >>>
    

    Notwithstanding the comment in the code, this is strong evidence that connection pooling cannot be used asynchronously with this package.

    As for the post at https://stackoverflow.com/a/66222924/242042, the OP states, "Turns out connection pooling are (sic) not worth it. Connection pooling can cause mysql8 alter table to lock forever." The OP provides no reference for the "alter table problem" and I could not find any, which does not mean that the problem does not exist. But as far as connection pooling being "worth it", opening and closing connections require network activity and you do save something by only opening a connection once and reusing it. But there is a second reason for using connection pooling. You asked, "... if I am running on a single thread, why would I need connection pooling?" For the same reason you might want to use connection pooling if you were running instead multiple threads needing to do database access. Pooling not only allows you to readily reuse connections, it is also a mechanism to limit the number of connections that can be created even if it means potentially forcing a task that wants a connection to wait for a free connection because all connections in the pool are currently being used.

    You could use aiomysql, but it is based on PyMySql and not mysql.connector.

    Update

    Here is the general idea for creating an asynchronous connection pool using the mysql.connector.aio package. Just remember not to call close on connections obtained using this pool.

    import asyncio
    
    from mysql.connector.aio import connect
    
    class AsyncMySQLConnectionPool:
        def __init__(self, pool_size, **db_config):
            self._pool_size = pool_size
            self._db_config = db_config
            self._pool = asyncio.Queue()
            self._connections = []
            self._connection_count = 0  # Track live connections
    
        async def get_connection(self):
            """Get a connection from the pool."""
            # See if there is an immediately available connection:
            try:
                conn = self._pool.get_nowait()
            except asyncio.QueueEmpty:
                pass
            else:
                return conn
    
            # Here if there are no immediately available pool connections
            if self._connection_count < self._pool_size:
                # We must increment first since we might have a task switch
                # trying to acquire a new connectionL
                self._connection_count += 1
                conn = await connect(**self._db_config)
                self._connections.append(conn)
                return conn
    
            # pool size is at its maximum size, so we may have to
            # wait a while:
            return await self._pool.get()
    
        async def release_connection(self, conn):
            """Returns a connection to the pool."""
            # But first do a rollback:
            await conn.rollback()
            await conn.cmd_reset_connection()
            await self._pool.put(conn)
    
        async def close(self):
            """Closes all connections in the pool."""
    
            # Empty the pool of any connections that have been returned.
            # Ideally, this should be all the connections.
            while not self._pool.empty():
                self._pool.get_nowait()
    
            # Now shutdown all of the connections
            while self._connections:
                conn = self._connections.pop()
                self._connection_count -= 1
                # close() can and usually does result in stack traces being
                # printed on stderr even though no exception is raised. Better
                # to use shutdown, which does not try to send a QUIT command:
                await conn.shutdown()
    
    if __name__ == '__main__':
        async def main():
            USER = 'xxxxxxxx'
            PASSWORD = 'xxxxxxxx'
    
            # Create a connection pool with a max size of 3
            POOL_SIZE = 3
    
            pool = AsyncMySQLConnectionPool(
                pool_size=POOL_SIZE,
                host="localhost",
                user=USER,
                password=PASSWORD,
                database="test"
            )
    
            # Fill up the pool to max size:
            connections = [
                await pool.get_connection()
                for _ in range(POOL_SIZE)
            ]
    
            # Release them all:
            for conn in connections:
                await pool.release_connection(conn)
    
            for _ in range(6):
                conn = await pool.get_connection()
                cursor = await conn.cursor()
                await cursor.execute("SELECT 1")
                print(id(conn), await cursor.fetchone())
                await cursor.close()
                await pool.release_connection(conn)
    
            await pool.close()
    
            # Demonstrate that when a connection is returned to the pool
            # with an uncommitted transaction that it is rolled back
    
            # Create a pool with only one connection so that the same connection
            # is always used:
    
            pool = AsyncMySQLConnectionPool(
                pool_size=1,
                host="localhost",
                user=USER,
                password=PASSWORD,
                database="test"
            )
    
            conn = await pool.get_connection()
            cursor = await conn.cursor(dictionary=True)
            await cursor.execute('select * from test where id = 1')
            print(f'\nBefore update:\n{await cursor.fetchone()}')
            await cursor.execute('update test set value = 20.0 where id = 1')
            await cursor.execute('select * from test where id = 1')
            print(f'\nAfter update:\n{await cursor.fetchone()}')
            await pool.release_connection(conn)
    
            conn = await pool.get_connection()
            cursor = await conn.cursor(dictionary=True)
            await cursor.execute('select * from test where id = 1')
            print(f'\nAfter connection is retured to the pool without a commit:\n{await cursor.fetchone()}')
            await pool.release_connection(conn)
    
            await pool.close()
    
        asyncio.run(main())
    

    Prints:

    2766995397840 (1,)
    2766992650992 (1,)
    2766997102256 (1,)
    2766995397840 (1,)
    2766992650992 (1,)
    2766997102256 (1,)
    
    Before update:
    {'id': 1, 'the_date': datetime.date(2023, 3, 31), 'engine': 'engine_a', 'value': 30.0}
    
    After update:
    {'id': 1, 'the_date': datetime.date(2023, 3, 31), 'engine': 'engine_a', 'value': 20.0}
    
    After connection is retured to the pool without a commit:
    {'id': 1, 'the_date': datetime.date(2023, 3, 31), 'engine': 'engine_a', 'value': 30.0}