I have a multithreaded ETL process inside a docker container that looks like this simplified code:
class Query(abc.ABC):
def __init__(self):
self.connection = sqlalchemy.create_engine(MYSQL_CONNECTION_STR)
def load(self, df: pd.DataFrame) -> None:
df.to_sql(
name=self.table, con=self.connection, if_exists="replace", index=False,
)
@abc.abstractmethod
def transform(self, data: object) -> pd.DataFrame:
pass
@abc.abstractmethod
def extract(self) -> object:
pass
# other methods...
class ComplianceInfluxQuery(Query):
# Implements abstract methods... load method is the same as Query class
ALL_QUERIES = [ComplianceInfluxQuery("cc_influx_share_count"),ComplianceInfluxQuery("cc_influx_we_count")....]
while True:
with ThreadPoolExecutor(max_workers=8) as pool:
for query in ALL_QUERIES:
pool.submit(execute_etl, query) # execute_etl function calls extract, transform and load
Many classes inherit from Query, with the same implementation for load()
as shown in class Query
which simply loads a pandas DataFrame object to an sql table, and replacing the table if it exists.
All the classes run concurrently and load the results to MySQL database after they finish Extract()
and Transform()
.
Every class loads a different table to the database.
Quite often I get a deadlock from a random thread when the load()
method is called:
2020-09-17 09:48:28,138 | INFO | root | query | 44 | 1 | ComplianceInfluxQuery.load()
2020-09-17 09:48:28,160 | INFO | root | query | 44 | 1 | ComplianceInfluxQuery.load()
2020-09-17 09:48:28,241 | ERROR | root | calculate | 124 | 1 | Failed to execute query ComplianceInfluxQuery
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 163, in execute
result = self._query(query)
File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 321, in _query
conn.query(q)
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 505, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 724, in _read_query_result
result.read()
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 1069, in read
first_packet = self.connection._read_packet()
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 676, in _read_packet
packet.raise_for_error()
File "/usr/local/lib/python3.8/site-packages/pymysql/protocol.py", line 223, in raise_for_error
err.raise_mysql_exception(self._data)
File "/usr/local/lib/python3.8/site-packages/pymysql/err.py", line 107, in raise_mysql_exception
raise errorclass(errno, errval)
pymysql.err.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "calculate.py", line 119, in execute_etl
query.run()
File "/chil_etl/query.py", line 45, in run
self.load(df)
File "/chil_etl/query.py", line 22, in load
df.to_sql(
File "/usr/local/lib/python3.8/site-packages/pandas/core/generic.py", line 2653, in to_sql
sql.to_sql(
File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 512, in to_sql
pandas_sql.to_sql(
File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 1316, in to_sql
table.create()
File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 649, in create
self._execute_create()
File "/usr/local/lib/python3.8/site-packages/pandas/io/sql.py", line 641, in _execute_create
self.table.create()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/schema.py", line 927, in create
bind._run_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2097, in _run_visitor
conn._run_visitor(visitorcallable, element, **kwargs)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1656, in _run_visitor
visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/visitors.py", line 145, in traverse_single
return meth(obj, **kw)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 827, in visit_table
self.connection.execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
return connection._execute_ddl(self, multiparams, params)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1068, in _execute_ddl
ret = self._execute_context(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 163, in execute
result = self._query(query)
File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 321, in _query
conn.query(q)
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 505, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 724, in _read_query_result
result.read()
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 1069, in read
first_packet = self.connection._read_packet()
File "/usr/local/lib/python3.8/site-packages/pymysql/connections.py", line 676, in _read_packet
packet.raise_for_error()
File "/usr/local/lib/python3.8/site-packages/pymysql/protocol.py", line 223, in raise_for_error
err.raise_mysql_exception(self._data)
File "/usr/local/lib/python3.8/site-packages/pymysql/err.py", line 107, in raise_mysql_exception
raise errorclass(errno, errval)
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL:
CREATE TABLE cc_influx_share_count (
unique_identifier TEXT,
nfs_share_count FLOAT(53),
smb_share_count FLOAT(53),
s3_bucket_count FLOAT(53)
)
]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
The log shows the load()
method called by two threads at almost the same time.
This could happen in all of the classes regardless of the data.
I ran the command SHOW ENGINE INNODB STATUS
and no deadlocks were listed there.
I checked the general_log table to understand what happened during the deadlock better, but didn't notice anything useful apart from the fact that the thread which deadlocked did not inserted any values to the table cc_influx_share_count
when (i think) it should have:
SELECT * FROM mysql.general_log WHERE event_time >= "2020-09-17 09:48:27" AND event_time <= "2020-09-17 09:48:29" ORDER BY event_time ASC;
Output (sensitive data removed):
Time: 2020-09-17 09:48:27.010747. Event: COMMIT
Time: 2020-09-17 09:48:27.011075. Event: ROLLBACK
Time: 2020-09-17 09:48:27.012042. Event: CREATE TABLE cc_influx_last_metric_time (
unique_identifier TEXT,
timestamp TIMESTAMP NULL,
uptime BIGINT
)
Time: 2020-09-17 09:48:27.033973. Event: COMMIT
Time: 2020-09-17 09:48:27.034327. Event: ROLLBACK
Time: 2020-09-17 09:48:27.053837. Event: INSERT INTO cc_influx_last_metric_time (unique_identifier, timestamp, uptime) VALUES (...)
Time: 2020-09-17 09:48:27.066930. Event: COMMIT
Time: 2020-09-17 09:48:27.068657. Event: ROLLBACK
Time: 2020-09-17 09:48:27.887579. Event: DESCRIBE `container_post_deployments`
Time: 2020-09-17 09:48:27.889705. Event: ROLLBACK
Time: 2020-09-17 09:48:27.890186. Event: DESCRIBE `container_post_deployments`
Time: 2020-09-17 09:48:27.892125. Event: ROLLBACK
Time: 2020-09-17 09:48:27.892619. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:27.894964. Event: SHOW CREATE TABLE `container_post_deployments`
Time: 2020-09-17 09:48:27.896491. Event: ROLLBACK
Time: 2020-09-17 09:48:27.897097. Event: DROP TABLE container_post_deployments
Time: 2020-09-17 09:48:27.907816. Event: COMMIT
Time: 2020-09-17 09:48:27.908322. Event: ROLLBACK
Time: 2020-09-17 09:48:27.909890. Event: CREATE TABLE container_post_deployments (
image TEXT,
`clientId` TEXT,
message TEXT,
timestamp TIMESTAMP NULL,
status_code BIGINT,
something TEXT,
user_agent TEXT
)
Time: 2020-09-17 09:48:27.928665. Event: COMMIT
Time: 2020-09-17 09:48:27.929089. Event: ROLLBACK
Time: 2020-09-17 09:48:27.932310. Event: INSERT INTO container_post_deployments (image, `clientId`, message, timestamp, status_code, something, user_agent) VALUES (...)
Time: 2020-09-17 09:48:27.934410. Event: COMMIT
Time: 2020-09-17 09:48:27.936774. Event: ROLLBACK
Time: 2020-09-17 09:48:28.140219. Event: DESCRIBE `cc_influx_share_count`
Time: 2020-09-17 09:48:28.163517. Event: DESCRIBE `cc_influx_we_count`
Time: 2020-09-17 09:48:28.166070. Event: ROLLBACK
Time: 2020-09-17 09:48:28.168159. Event: DESCRIBE `cc_influx_share_count`
Time: 2020-09-17 09:48:28.169895. Event: ROLLBACK
Time: 2020-09-17 09:48:28.170583. Event: DESCRIBE `cc_influx_we_count`
Time: 2020-09-17 09:48:28.174444. Event: ROLLBACK
Time: 2020-09-17 09:48:28.176339. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:28.177915. Event: ROLLBACK
Time: 2020-09-17 09:48:28.179331. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:28.182284. Event: SHOW CREATE TABLE `cc_influx_share_count`
Time: 2020-09-17 09:48:28.185154. Event: ROLLBACK
Time: 2020-09-17 09:48:28.192493. Event: SHOW CREATE TABLE `cc_influx_we_count`
Time: 2020-09-17 09:48:28.192887. Event: DROP TABLE cc_influx_share_count
Time: 2020-09-17 09:48:28.194530. Event: ROLLBACK
Time: 2020-09-17 09:48:28.195707. Event: DROP TABLE cc_influx_we_count
Time: 2020-09-17 09:48:28.207712. Event: COMMIT
Time: 2020-09-17 09:48:28.208141. Event: ROLLBACK
Time: 2020-09-17 09:48:28.210087. Event: CREATE TABLE cc_influx_share_count (
unique_identifier TEXT,
nfs_share_count FLOAT(53),
smb_share_count FLOAT(53),
s3_bucket_count FLOAT(53)
)
Time: 2020-09-17 09:48:28.215350. Event: COMMIT
Time: 2020-09-17 09:48:28.216115. Event: ROLLBACK
Time: 2020-09-17 09:48:28.217996. Event: CREATE TABLE cc_influx_we_count (
unique_identifier TEXT,
timestamp TIMESTAMP NULL,
`ANF` FLOAT(53),
`S3` FLOAT(53),
`CVO` FLOAT(53)
)
Time: 2020-09-17 09:48:28.240455. Event: ROLLBACK
Time: 2020-09-17 09:48:28.240908. Event: ROLLBACK
Time: 2020-09-17 09:48:28.244425. Event: COMMIT
Time: 2020-09-17 09:48:28.244965. Event: ROLLBACK
Time: 2020-09-17 09:48:28.249009. Event: INSERT INTO cc_influx_we_count (unique_identifier, timestamp, `ANF`, `S3`, `CVO`) VALUES (...)
Time: 2020-09-17 09:48:28.253638. Event: COMMIT
Time: 2020-09-17 09:48:28.256299. Event: ROLLBACK
Time: 2020-09-17 09:48:28.525814. Event: DESCRIBE `cc_influx_disk_usage`
Time: 2020-09-17 09:48:28.530211. Event: ROLLBACK
Time: 2020-09-17 09:48:28.532392. Event: DESCRIBE `cc_influx_disk_usage`
Time: 2020-09-17 09:48:28.539685. Event: ROLLBACK
Time: 2020-09-17 09:48:28.541868. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:28.560271. Event: SHOW CREATE TABLE `cc_influx_disk_usage`
Time: 2020-09-17 09:48:28.565451. Event: ROLLBACK
Time: 2020-09-17 09:48:28.569257. Event: DROP TABLE cc_influx_disk_usage
Time: 2020-09-17 09:48:28.585562. Event: COMMIT
Time: 2020-09-17 09:48:28.595193. Event: ROLLBACK
Time: 2020-09-17 09:48:28.598230. Event: CREATE TABLE cc_influx_disk_usage (
unique_identifier TEXT,
timestamp TIMESTAMP NULL,
total_gb FLOAT(53),
used_gb FLOAT(53)
)
Time: 2020-09-17 09:48:28.619580. Event: COMMIT
Time: 2020-09-17 09:48:28.620411. Event: ROLLBACK
Time: 2020-09-17 09:48:28.625385. Event: INSERT INTO cc_influx_disk_usage (unique_identifier, timestamp, total_gb, used_gb) VALUES (....)
Time: 2020-09-17 09:48:28.628706. Event: COMMIT
Time: 2020-09-17 09:48:28.631955. Event: ROLLBACK
Time: 2020-09-17 09:48:28.840143. Event: DESCRIBE `cc_influx_aws_subscription`
Time: 2020-09-17 09:48:28.844303. Event: ROLLBACK
Time: 2020-09-17 09:48:28.845637. Event: DESCRIBE `cc_influx_aws_subscription`
Time: 2020-09-17 09:48:28.848076. Event: ROLLBACK
Time: 2020-09-17 09:48:28.848646. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:28.851165. Event: SHOW CREATE TABLE `cc_influx_aws_subscription`
Time: 2020-09-17 09:48:28.852202. Event: ROLLBACK
Time: 2020-09-17 09:48:28.852691. Event: DROP TABLE cc_influx_aws_subscription
Time: 2020-09-17 09:48:28.861657. Event: COMMIT
Time: 2020-09-17 09:48:28.862099. Event: ROLLBACK
Time: 2020-09-17 09:48:28.863288. Event: CREATE TABLE cc_influx_aws_subscription (
unique_identifier TEXT,
timestamp TIMESTAMP NULL,
is_subscribed BIGINT
)
Time: 2020-09-17 09:48:28.878554. Event: COMMIT
Time: 2020-09-17 09:48:28.879113. Event: ROLLBACK
Time: 2020-09-17 09:48:28.881054. Event: INSERT INTO cc_influx_aws_subscription (unique_identifier, timestamp, is_subscribed) VALUES (....)
Time: 2020-09-17 09:48:28.882642. Event: COMMIT
Time: 2020-09-17 09:48:28.884614. Event: ROLLBACK
Time: 2020-09-17 09:48:28.918677. Event: DESCRIBE `hubspot_data`
Time: 2020-09-17 09:48:28.922938. Event: ROLLBACK
Time: 2020-09-17 09:48:28.923993. Event: DESCRIBE `hubspot_data`
Time: 2020-09-17 09:48:28.928181. Event: ROLLBACK
Time: 2020-09-17 09:48:28.928808. Event: SHOW FULL TABLES FROM `chil_etl`
Time: 2020-09-17 09:48:28.931225. Event: SHOW CREATE TABLE `hubspot_data`
Time: 2020-09-17 09:48:28.934269. Event: ROLLBACK
Time: 2020-09-17 09:48:28.934851. Event: DROP TABLE hubspot_data
Time: 2020-09-17 09:48:28.949309. Event: COMMIT
Time: 2020-09-17 09:48:28.949778. Event: ROLLBACK
Time: 2020-09-17 09:48:28.953829. Event: CREATE TABLE hubspot_data (...)
Time: 2020-09-17 09:48:28.973177. Event: COMMIT
Time: 2020-09-17 09:48:28.973652. Event: ROLLBACK
This ETL is the only process running MySQL.
I have read the documentation about why deadlocks occur but I can't understand how two different tables with no connection between them can cause a deadlock.
I know I can simply run the load()
method again until it succeeds but I want to understand why the deadlocks occur, and how to prevent them.
MySQL version is 8.0.21. python 3.8.4. sqlalchemy 1.3.19. pandas 1.0.5. PyMySQL 0.10.1.
A possible solution I found for this issue was a retry mechanism. If a deadlock occurs - sleep and try a few more times until success while keeping the DF in memory:
class Query(abc.ABC):
def __init__(self):
self.engine = MysqlEngine.engine()
....
....
def load(self, df: pd.DataFrame) -> None:
for i in range(5): # If load fails due to a deadlock, try 4 more times
try:
df.to_sql(
name=self.table,
con=self.engine.connect(),
if_exists="replace",
index=False,
)
return
except sqlalchemy.exc.OperationalError as ex:
if "1213" in repr(ex):
logging.warning(
"Failed to acquire lock for %s", self.__class__.__name__
)
sleep(1)
The deadlocks still occur and you lose some performance, but it beats doing the entire Extrac - Transform all over again.