I'm trying to insert a pandas dataframe into a mysql database using the sqlalchemy cursor's method executemany
. It's a fast and efficient way to bulk insert data but there is no way to insert pandas.NA
/numpy.nan
/None
values without having a MySQLdb._exceptions.ProgrammingError
or MySQLdb._exceptions.OperationalError
.
import pandas as pd
from sqlalchemy import create_engine
def insert(dff, table_name):
engine = create_engine('mysql://user:password@host:port/database?charset=utf8'), echo=False)
# The query without the values to insert
query_template = """
INSERT INTO %s (%s)
VALUES (%s)
ON DUPLICATE KEY UPDATE %s;
""" % (
table_name,
",".join(["`%s`" % colname for colname in dff.columns]),
','.join(['%s'] * len(dff.columns)),
",".join(["`%s`=VALUES(`%s`)" % (colname, colname) for colname in dff.columns])
)
# Connection and bulk insert
with engine.begin() as connection:
raw_connection = connection.engine.raw_connection()
mycursor = raw_connection.cursor()
mycursor.executemany(query_template, dff.values.tolist()) # /!\ Here is the problem /!\
raw_connection.commit()
engine.dispose()
dff = pd.DataFrame({"col1": ["a", "b", "c", "d"], "col2": [1, pd.NA, 2, 3], "col3": [0.0, 1, pd.NA, 3.43]})
insert(dff, "my_table")
I could probably precompute every queries or a big query by iterating over the DataFrame content but it would be a very inefficient way to work. Using the pandas DataFrame to_sql
method would lack of flexibility when fine tunning ON DUPLICATE KEY
so it isn't an option.
What would be the most efficient way to perform a pandas DataFrame bulk insert with a custom query and nan values?
The real problem is that dff.values
create a typed matrix whom can't countained None
values for int or float. But in reality the executemany can insert None
values.
The fastest solution I've found is to correct the list of lists given to executemany
instead of correcting the dataframe content before creating the list of lists.
My inserted data aren't dff.values.tolist()
anymore but:
inserted_data = [
[None if pd.isnull(value) else value for value in sublist] \
for sublist in dff.values.tolist()]
mycursor.executemany(query_template, inserted_data )