pythonmysqlmultithreadingsqlalchemydeadlock

MySQL Deadlock when using DataFrame.to_sql in multithreaded environment


I have a multithreaded ETL process inside a docker container that looks like this simplified code:

class Query(abc.ABC):
    def __init__(self):
        self.connection = sqlalchemy.create_engine(MYSQL_CONNECTION_STR)

    def load(self, df: pd.DataFrame) -> None:
        df.to_sql(
            name=self.table, con=self.connection, if_exists="replace", index=False,
        )

    @abc.abstractmethod
    def transform(self, data: object) -> pd.DataFrame:
        pass

    @abc.abstractmethod
    def extract(self) -> object:
        pass

    #  other methods...


class ComplianceInfluxQuery(Query):
    # Implements abstract methods... load method is the same as Query class


ALL_QUERIES = [ComplianceInfluxQuery("cc_influx_share_count"),ComplianceInfluxQuery("cc_influx_we_count")....]


while True:
    with ThreadPoolExecutor(max_workers=8) as pool:
        for query in ALL_QUERIES:
            pool.submit(execute_etl, query) # execute_etl function calls extract, transform and load

Many classes inherit from Query, with the same implementation for load() as shown in class Query which simply loads a pandas DataFrame object to an sql table, and replacing the table if it exists.

All the classes run concurrently and load the results to MySQL database after they finish Extract() and Transform(). Every class loads a different table to the database.

Quite often I get a deadlock from a random thread when the load() method is called:

2020-09-17 09:48:28,138 | INFO | root | query | 44 | 1 | ComplianceInfluxQuery.load()
2020-09-17 09:48:28,160 | INFO | root | query | 44 | 1 | ComplianceInfluxQuery.load()
2020-09-17 09:48:28,241 | ERROR | root | calculate | 124 | 1 | Failed to execute query ComplianceInfluxQuery
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 163, in execute
    result = self._query(query)
  File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 321, in _query
    conn.query(q)
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 505, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 724, in _read_query_result
    result.read()
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 1069, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 676, in _read_packet
    packet.raise_for_error()
  File "/usr/local/lib/python3.8/site-packages/pymysql/protocol.py", line 223, in raise_for_error
    err.raise_mysql_exception(self._data)
  File "/usr/local/lib/python3.8/site-packages/pymysql/err.py", line 107, in raise_mysql_exception
    raise errorclass(errno, errval)
pymysql.err.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "calculate.py", line 119, in execute_etl
    query.run()
  File "/chil_etl/query.py", line 45, in run
    self.load(df)
  File "/chil_etl/query.py", line 22, in load
    df.to_sql(
  File "/usr/local/lib/python3.8/site-packages/pandas/core/generic.py", line 2653, in to_sql
    sql.to_sql(
  File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 512, in to_sql
    pandas_sql.to_sql(
  File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 1316, in to_sql
    table.create()
  File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 649, in create
    self._execute_create()
  File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 641, in _execute_create
    self.table.create()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/schema.py", line 927, in create
    bind._run_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2097, in _run_visitor
    conn._run_visitor(visitorcallable, element, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1656, in _run_visitor
    visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py", line 145, in traverse_single
    return meth(obj, **kw)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 827, in visit_table
    self.connection.execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
    return connection._execute_ddl(self, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1068, in _execute_ddl
    ret = self._execute_context(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 163, in execute
    result = self._query(query)
  File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 321, in _query
    conn.query(q)
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 505, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 724, in _read_query_result
    result.read()
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 1069, in read
    first_packet = self.connection._read_packet()
  File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 676, in _read_packet
    packet.raise_for_error()
  File "/usr/local/lib/python3.8/site-packages/pymysql/protocol.py", line 223, in raise_for_error
    err.raise_mysql_exception(self._data)
  File "/usr/local/lib/python3.8/site-packages/pymysql/err.py", line 107, in raise_mysql_exception
    raise errorclass(errno, errval)
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: 
CREATE TABLE cc_influx_share_count (
    unique_identifier TEXT, 
    nfs_share_count FLOAT(53), 
    smb_share_count FLOAT(53), 
    s3_bucket_count FLOAT(53)
)

]
(Background on this error at: http://sqlalche.me/e/13/e3q8)

The log shows the load() method called by two threads at almost the same time. This could happen in all of the classes regardless of the data.

I ran the command SHOW ENGINE INNODB STATUS and no deadlocks were listed there.

I checked the general_log table to understand what happened during the deadlock better, but didn't notice anything useful apart from the fact that the thread which deadlocked did not inserted any values to the table cc_influx_share_count when (i think) it should have:

SELECT * FROM mysql.general_log WHERE event_time >= "2020-09-17 09:48:27" AND event_time <= "2020-09-17 09:48:29" ORDER BY event_time ASC;

Output (sensitive data removed):

Time: 2020-09-17 09:48:27.010747. Event: COMMIT


Time: 2020-09-17 09:48:27.011075. Event: ROLLBACK


Time: 2020-09-17 09:48:27.012042. Event: CREATE TABLE cc_influx_last_metric_time (
    unique_identifier TEXT, 
    timestamp TIMESTAMP NULL, 
    uptime BIGINT
)


Time: 2020-09-17 09:48:27.033973. Event: COMMIT


Time: 2020-09-17 09:48:27.034327. Event: ROLLBACK


Time: 2020-09-17 09:48:27.053837. Event: INSERT INTO cc_influx_last_metric_time (unique_identifier, timestamp, uptime) VALUES (...)


Time: 2020-09-17 09:48:27.066930. Event: COMMIT


Time: 2020-09-17 09:48:27.068657. Event: ROLLBACK


Time: 2020-09-17 09:48:27.887579. Event: DESCRIBE `container_post_deployments`


Time: 2020-09-17 09:48:27.889705. Event: ROLLBACK


Time: 2020-09-17 09:48:27.890186. Event: DESCRIBE `container_post_deployments`


Time: 2020-09-17 09:48:27.892125. Event: ROLLBACK


Time: 2020-09-17 09:48:27.892619. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:27.894964. Event: SHOW CREATE TABLE `container_post_deployments`


Time: 2020-09-17 09:48:27.896491. Event: ROLLBACK


Time: 2020-09-17 09:48:27.897097. Event: DROP TABLE container_post_deployments


Time: 2020-09-17 09:48:27.907816. Event: COMMIT


Time: 2020-09-17 09:48:27.908322. Event: ROLLBACK


Time: 2020-09-17 09:48:27.909890. Event: CREATE TABLE container_post_deployments (
    image TEXT, 
    `clientId` TEXT, 
    message TEXT, 
    timestamp TIMESTAMP NULL, 
    status_code BIGINT, 
    something TEXT, 
    user_agent TEXT
)


Time: 2020-09-17 09:48:27.928665. Event: COMMIT


Time: 2020-09-17 09:48:27.929089. Event: ROLLBACK


Time: 2020-09-17 09:48:27.932310. Event: INSERT INTO container_post_deployments (image, `clientId`, message, timestamp, status_code, something, user_agent) VALUES (...)


Time: 2020-09-17 09:48:27.934410. Event: COMMIT


Time: 2020-09-17 09:48:27.936774. Event: ROLLBACK


Time: 2020-09-17 09:48:28.140219. Event: DESCRIBE `cc_influx_share_count`


Time: 2020-09-17 09:48:28.163517. Event: DESCRIBE `cc_influx_we_count`


Time: 2020-09-17 09:48:28.166070. Event: ROLLBACK


Time: 2020-09-17 09:48:28.168159. Event: DESCRIBE `cc_influx_share_count`


Time: 2020-09-17 09:48:28.169895. Event: ROLLBACK


Time: 2020-09-17 09:48:28.170583. Event: DESCRIBE `cc_influx_we_count`


Time: 2020-09-17 09:48:28.174444. Event: ROLLBACK


Time: 2020-09-17 09:48:28.176339. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:28.177915. Event: ROLLBACK


Time: 2020-09-17 09:48:28.179331. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:28.182284. Event: SHOW CREATE TABLE `cc_influx_share_count`


Time: 2020-09-17 09:48:28.185154. Event: ROLLBACK


Time: 2020-09-17 09:48:28.192493. Event: SHOW CREATE TABLE `cc_influx_we_count`


Time: 2020-09-17 09:48:28.192887. Event: DROP TABLE cc_influx_share_count


Time: 2020-09-17 09:48:28.194530. Event: ROLLBACK


Time: 2020-09-17 09:48:28.195707. Event: DROP TABLE cc_influx_we_count


Time: 2020-09-17 09:48:28.207712. Event: COMMIT


Time: 2020-09-17 09:48:28.208141. Event: ROLLBACK


Time: 2020-09-17 09:48:28.210087. Event: CREATE TABLE cc_influx_share_count (
    unique_identifier TEXT, 
    nfs_share_count FLOAT(53), 
    smb_share_count FLOAT(53), 
    s3_bucket_count FLOAT(53)
)


Time: 2020-09-17 09:48:28.215350. Event: COMMIT


Time: 2020-09-17 09:48:28.216115. Event: ROLLBACK


Time: 2020-09-17 09:48:28.217996. Event: CREATE TABLE cc_influx_we_count (
    unique_identifier TEXT, 
    timestamp TIMESTAMP NULL, 
    `ANF` FLOAT(53), 
    `S3` FLOAT(53), 
    `CVO` FLOAT(53)
)


Time: 2020-09-17 09:48:28.240455. Event: ROLLBACK


Time: 2020-09-17 09:48:28.240908. Event: ROLLBACK


Time: 2020-09-17 09:48:28.244425. Event: COMMIT


Time: 2020-09-17 09:48:28.244965. Event: ROLLBACK


Time: 2020-09-17 09:48:28.249009. Event: INSERT INTO cc_influx_we_count (unique_identifier, timestamp, `ANF`, `S3`, `CVO`) VALUES (...)


Time: 2020-09-17 09:48:28.253638. Event: COMMIT


Time: 2020-09-17 09:48:28.256299. Event: ROLLBACK


Time: 2020-09-17 09:48:28.525814. Event: DESCRIBE `cc_influx_disk_usage`


Time: 2020-09-17 09:48:28.530211. Event: ROLLBACK


Time: 2020-09-17 09:48:28.532392. Event: DESCRIBE `cc_influx_disk_usage`


Time: 2020-09-17 09:48:28.539685. Event: ROLLBACK


Time: 2020-09-17 09:48:28.541868. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:28.560271. Event: SHOW CREATE TABLE `cc_influx_disk_usage`


Time: 2020-09-17 09:48:28.565451. Event: ROLLBACK


Time: 2020-09-17 09:48:28.569257. Event: DROP TABLE cc_influx_disk_usage


Time: 2020-09-17 09:48:28.585562. Event: COMMIT


Time: 2020-09-17 09:48:28.595193. Event: ROLLBACK


Time: 2020-09-17 09:48:28.598230. Event: CREATE TABLE cc_influx_disk_usage (
    unique_identifier TEXT, 
    timestamp TIMESTAMP NULL, 
    total_gb FLOAT(53), 
    used_gb FLOAT(53)
)


Time: 2020-09-17 09:48:28.619580. Event: COMMIT


Time: 2020-09-17 09:48:28.620411. Event: ROLLBACK


Time: 2020-09-17 09:48:28.625385. Event: INSERT INTO cc_influx_disk_usage (unique_identifier, timestamp, total_gb, used_gb) VALUES (....)


Time: 2020-09-17 09:48:28.628706. Event: COMMIT


Time: 2020-09-17 09:48:28.631955. Event: ROLLBACK


Time: 2020-09-17 09:48:28.840143. Event: DESCRIBE `cc_influx_aws_subscription`


Time: 2020-09-17 09:48:28.844303. Event: ROLLBACK


Time: 2020-09-17 09:48:28.845637. Event: DESCRIBE `cc_influx_aws_subscription`


Time: 2020-09-17 09:48:28.848076. Event: ROLLBACK


Time: 2020-09-17 09:48:28.848646. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:28.851165. Event: SHOW CREATE TABLE `cc_influx_aws_subscription`


Time: 2020-09-17 09:48:28.852202. Event: ROLLBACK


Time: 2020-09-17 09:48:28.852691. Event: DROP TABLE cc_influx_aws_subscription


Time: 2020-09-17 09:48:28.861657. Event: COMMIT


Time: 2020-09-17 09:48:28.862099. Event: ROLLBACK


Time: 2020-09-17 09:48:28.863288. Event: CREATE TABLE cc_influx_aws_subscription (
    unique_identifier TEXT, 
    timestamp TIMESTAMP NULL, 
    is_subscribed BIGINT
)


Time: 2020-09-17 09:48:28.878554. Event: COMMIT


Time: 2020-09-17 09:48:28.879113. Event: ROLLBACK


Time: 2020-09-17 09:48:28.881054. Event: INSERT INTO cc_influx_aws_subscription (unique_identifier, timestamp, is_subscribed) VALUES (....)


Time: 2020-09-17 09:48:28.882642. Event: COMMIT


Time: 2020-09-17 09:48:28.884614. Event: ROLLBACK


Time: 2020-09-17 09:48:28.918677. Event: DESCRIBE `hubspot_data`


Time: 2020-09-17 09:48:28.922938. Event: ROLLBACK


Time: 2020-09-17 09:48:28.923993. Event: DESCRIBE `hubspot_data`


Time: 2020-09-17 09:48:28.928181. Event: ROLLBACK


Time: 2020-09-17 09:48:28.928808. Event: SHOW FULL TABLES FROM `chil_etl`


Time: 2020-09-17 09:48:28.931225. Event: SHOW CREATE TABLE `hubspot_data`


Time: 2020-09-17 09:48:28.934269. Event: ROLLBACK


Time: 2020-09-17 09:48:28.934851. Event: DROP TABLE hubspot_data


Time: 2020-09-17 09:48:28.949309. Event: COMMIT


Time: 2020-09-17 09:48:28.949778. Event: ROLLBACK


Time: 2020-09-17 09:48:28.953829. Event: CREATE TABLE hubspot_data (...)


Time: 2020-09-17 09:48:28.973177. Event: COMMIT


Time: 2020-09-17 09:48:28.973652. Event: ROLLBACK

This ETL is the only process running MySQL. I have read the documentation about why deadlocks occur but I can't understand how two different tables with no connection between them can cause a deadlock. I know I can simply run the load() method again until it succeeds but I want to understand why the deadlocks occur, and how to prevent them.

MySQL version is 8.0.21. python 3.8.4. sqlalchemy 1.3.19. pandas 1.0.5. PyMySQL 0.10.1.


Solution

  • A possible solution I found for this issue was a retry mechanism. If a deadlock occurs - sleep and try a few more times until success while keeping the DF in memory:

    class Query(abc.ABC):
        def __init__(self):
            self.engine = MysqlEngine.engine()
    
        ....
        ....
    
        def load(self, df: pd.DataFrame) -> None:
            for i in range(5):  # If load fails due to a deadlock, try 4 more times
                try:
                    df.to_sql(
                        name=self.table,
                        con=self.engine.connect(),
                        if_exists="replace",
                        index=False,
                    )
                    return
                except sqlalchemy.exc.OperationalError as ex:
                    if "1213" in repr(ex):
                        logging.warning(
                            "Failed to acquire lock for %s", self.__class__.__name__
                        )
                    sleep(1)
    

    The deadlocks still occur and you lose some performance, but it beats doing the entire Extrac - Transform all over again.