mysqlsqlalchemydatabase-connectionoperationalerror

python: sqlalchemy - how do I ensure connection not stale using new event system


I am using the sqlalchemy package in python. I have an operation that takes some time to execute after I perform an autoload on an existing table. This causes the following error when I attempt to use the connection:

sqlalchemy.exc.OperationalError: (OperationalError) (2006, 'MySQL server has gone away')

I have a simple utility function that performs an insert many:

def insert_data(data_2_insert, table_name):
    engine = create_engine('mysql://blah:blah123@localhost/dbname')
    # Metadata is a Table catalog. 
    metadata = MetaData()
    table = Table(table_name, metadata, autoload=True, autoload_with=engine)
    for c in mytable.c:
        print c
    column_names = tuple(c.name for c in mytable.c)
    final_data = [dict(zip(column_names, x)) for x in data_2_insert]
    ins = mytable.insert()
    conn = engine.connect()
    conn.execute(ins, final_data)
    conn.close()

It is the following line that times long time to execute since 'data_2_insert' has 677,161 rows.

final_data = [dict(zip(column_names, x)) for x in data_2_insert]

I came across this question which refers to a similar problem. However I am not sure how to implement the connection management suggested by the accepted answer because robots.jpg pointed this out in a comment:

Note for SQLAlchemy 0.7 - PoolListener is deprecated, but the same solution can be implemented using the new event system.

If someone can please show me a couple of pointers on how I could go about integrating the suggestions into the way I use sqlalchemy I would be very appreciative. Thank you.


Solution

  • I think you are looking for something like this:

    from sqlalchemy import exc, event
    from sqlalchemy.pool import Pool
    
    @event.listens_for(Pool, "checkout")
    def check_connection(dbapi_con, con_record, con_proxy):
        '''Listener for Pool checkout events that pings every connection before using.
        Implements pessimistic disconnect handling strategy. See also:
        http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''
    
        cursor = dbapi_con.cursor()
        try:
            cursor.execute("SELECT 1")  # could also be dbapi_con.ping(),
                                        # not sure what is better
        except exc.OperationalError, ex:
            if ex.args[0] in (2006,   # MySQL server has gone away
                              2013,   # Lost connection to MySQL server during query
                              2055):  # Lost connection to MySQL server at '%s', system error: %d
                # caught by pool, which will retry with a new connection
                raise exc.DisconnectionError()
            else:
                raise
    

    If you wish to trigger this strategy conditionally, you should avoid use of decorator here and instead register listener using listen() function:

    # somewhere during app initialization
    if config.check_connection_on_checkout:
        event.listen(Pool, "checkout", check_connection)
    

    More info: