I am trying to do the bulk insert to the mssql database. The app should fetch data from the api, arrange it into pandas.DataFrame, force int type on column "num" .astype(int).
After this I would like to start transaction with database to
Can someone please explain to me why am I getting the error "num (INTEGER) not a string" when I try to run this version of code:
SQL Table schema:
CREATE TABLE [dbo].[discountDimensions](
[ID] [int] IDENTITY(1,1) NOT NULL,
[num] [int] NOT NULL,
[name] [varchar](60) NULL,
[posPercent] [float] NULL
)
In DataProcessor.py:
def discounts_dim_to_df(self, data:dict):
df = pd.DataFrame(data["discounts"])
df = df.drop(columns=['num', 'name'])
df['mstrNum'] = df["mstrNum"].astype(int)
return df.rename(columns={"mstrNum":"num","mstrName":"name"})
In Conn.py:
class Conn:
def __init__(self, server:str, database:str, username:str, passwd:str):
driver = 'ODBC Driver 17 for SQL Server'
connection_string = f'mssql+pyodbc://{username}:{passwd}@{server}:1433/{database}?driver={driver}'
try:
self.engine = create_engine(connection_string, pool_pre_ping=True)
Session = sessionmaker(bind=self.engine)
self.session = Session()
except Exception as err:
print(f'Failed to connect to {server} -> {database}')
mm = Mailman()
mm.connect_to_db_failed(server=server,db=database,error_message=err)
def update_discounts_db(self, data:pd.DataFrame) -> bool:
create_temp_sql = '''
CREATE TABLE #tempDiscountDimensions (
num INT PRIMARY KEY,
name VARCHAR(60) NOT NULL,
posPercent FLOAT NULL
);
'''
merge_sql = '''
MERGE INTO discountDimensions AS target
USING #tempDiscountDimensions AS source
ON target.num = source.num
WHEN MATCHED THEN
UPDATE SET target.name = source.name,
target.posPercent = source.posPercent
WHEN NOT MATCHED THEN
INSERT (num, name, posPercent) VALUES (source.num, source.name, source.posPercent);
'''
try:
self.session.execute(text("""IF OBJECT_ID('tempdb..#tempDiscountDimensions', 'U') IS NOT NULL
DROP TABLE #tempDiscountDimensions;"""))
self.session.execute(text(create_temp_sql))
data.to_sql('#tempDiscountDimensions',
con=self.session,
if_exists='append',
index=False,
dtype={
"num": Integer(),
"name": String(),
"posPercent": Float()
})
self.session.execute(text(merge_sql))
self.session.execute(text('DROP TABLE #tempDiscountDimensions'))
self.session.commit()
except Exception as err:
self.session.rollback()
print(f'Failed to update/insert data from discountDimensions table:\n{err}')
mm = Mailman()
mm.process_failed(option='db_connection', error=err)
return None
Gives error:
Traceback (most recent call last):
File "path\to\folder\main.py", line 81, in <module>
conn.update_discounts_db(data=data_disc)
File "path\to\folder\Conn.py", line 82, in update_discounts_db
data.to_sql('#tempDiscountDimensions',
File "path\to\folder\_env\Lib\site-packages\pandas\util\_decorators.py", line 333, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "path\to\folder\_env\Lib\site-packages\pandas\core\generic.py", line 3087, in to_sql
return sql.to_sql(
^^^^^^^^^^^
File "path\to\folder\_env\Lib\site-packages\pandas\io\sql.py", line 842, in to_sql
return pandas_sql.to_sql(
^^^^^^^^^^^^^^^^^^
File "path\to\folder\_env\Lib\site-packages\pandas\io\sql.py", line 2839, in to_sql
raise ValueError(f"{col} ({my_type}) not a string")
ValueError: num (INTEGER) not a string
But when I change "self.session" to the "with self.engine.begin() as connection" it works. However, I lost the ability to roll back in case of an error.
try:
with self.engine.begin() as connection:
connection.execute(text("""IF OBJECT_ID('tempdb..#tempDiscountDimensions', 'U') IS NOT NULL
DROP TABLE #tempDiscountDimensions;"""))
connection.execute(text(create_temp_sql))
data.to_sql('#tempDiscountDimensions',
con=connection,
if_exists='append',
index=False,
dtype={
"num": Integer(),
"name": String(),
"posPercent": Float()
})
connection.execute(text(merge_sql))
connection.execute(text('DROP TABLE #tempDiscountDimensions'))
The issue I was having was due to passing an instance of a Session to the "con" in pandas.DataFrame.to_sql and not the issues with the data type as I previously suspected.
According to pandas' docs DataFrame.to_sql
parameter "con" should have "sqlalchemy.engine.(Engine or Connection) or sqlite3.Connection"
Thanks to @GordThompson pointing me in the right direction, I checked through the SQLAlchemy docs on the Session.connection() which it starts transaction and is used to procure a new connection on the session's bound engin, that is exactly what my df.to_sql(con=) needed.