python-3.xmultithreadinggeventpymssql

Python - SQL transaction errors in multithreading (pymssql library)


The following code is generating transaction errors.

The COMMIT TRANSACTION request has no corresponding BEGIN TRANSACTION.

The larger the gevent pool, the more frequent the errors.

What I do is:

  1. pool spawn in my main function:
pool.spawn(collector, target_table, source_table, columns_to_copy, sql_filter)

  1. inside collector function I instantiate a mydb object and setup db connection
def collector( target_table, source_table, columns_to_copy, sql_filter):
    mydb = MySQLClass(host=SQL_HOST, database_name='mydb', user=myuser, password=mypw)

    .....
    mydb.sql_delete(table, sql_where_filter)

note that MySQLClass __init__ is where I actually establish the connection:

class MySQLClass(object):   
  def __init__(self, host, database_name, user, password  ):
   self.db = pymssql.connect(  host=host, 
                                        database=database_name,
                                        user=user, 
                                        password=password )    
  self.cursor=self.db.cursor()
  1. call to cursor.execute(DELETE) within mydb.my_defined_delete_method
    def sql_delete(self, table, sql_filter="" ):
            self.cursor.execute("DELETE FROM " + table + " " + sql_filter ) 
        self.db.commit()

        return

when multithreading @ 10x this barely ever occurs, when multithreading @ 20+ x this becomes increasingly more frequent.

I thought the error was originating in another part of the code where I actually have a cursor.execute("BEGIN TRAN...."), but that is rarely the case, if ever.

Any ideas?


Solution

  • What worked for me in the end was using the backoff library to implement a reset on the db connection object (and therefore the cursor).

    The function that resets the object works about like this:

    def backoff_reset_db(details):
      """ you don't need to know what's in details, these are the same args and kwargs that you passed to the function myfunct below, which are being intercepted by backoff library so that you can work on them
    """
        for db in details['args']:
            if isinstance(db, pymssql._pymssql.Connection):
                db = pymssql.connect(host,dbname,user,passw)
        for _k, db in details['kwargs'].items():
            if isinstance(db, pymssql._pymssql.Connection):
                db = pymssql.connect(host,dbname,user,passw)
    

    at this point, assuming you have a function myfunct that is raising the transaction error, you can simply decorate the function with backoff:

    def myfunct(db_connection, arg2, kwarg1,...):
      do_something_with(db_connection)
      return
    

    what this does is: if the function raises an unhandled Exception (any exception type in this case), backoff retries max 5 times using an exponential interval, and each time it retries, it calls backoff_reset_db. backoff_reset_db in turn takes any argument that is a pymssql connection object, and reinstantiates the object, therefore clearing any broken transaction.

    For me the transaction errors were less than 2%, but without reinstantiating the db object, if they failed once, they would always fail, so no number of retries would help.