pythonpandassqlalchemy

Why SQLAlchemy gives error "num (INTEGER) not a string" when used with Session but not when used with engine.begin()


I am trying to do the bulk insert to the mssql database. The app should fetch data from the api, arrange it into pandas.DataFrame, force int type on column "num" .astype(int).

After this I would like to start transaction with database to

  1. create temporary table
  2. insert data into tempTable
  3. merge tempTable with prodTable
  4. commit transaction if no errors occurred or rollback if database raised an error.

Can someone please explain to me why am I getting the error "num (INTEGER) not a string" when I try to run this version of code:

SQL Table schema:

CREATE TABLE [dbo].[discountDimensions](
[ID] [int] IDENTITY(1,1) NOT NULL,
[num] [int] NOT NULL,
[name] [varchar](60) NULL,
[posPercent] [float] NULL
)

In DataProcessor.py:

    def discounts_dim_to_df(self, data:dict):
        df = pd.DataFrame(data["discounts"])
        df = df.drop(columns=['num', 'name'])
        df['mstrNum'] = df["mstrNum"].astype(int)
        return df.rename(columns={"mstrNum":"num","mstrName":"name"})

In Conn.py:

  class Conn:
     def __init__(self, server:str, database:str, username:str, passwd:str): 
        driver = 'ODBC Driver 17 for SQL Server'
        connection_string = f'mssql+pyodbc://{username}:{passwd}@{server}:1433/{database}?driver={driver}'
        try:
            self.engine = create_engine(connection_string, pool_pre_ping=True)
            Session = sessionmaker(bind=self.engine)
            self.session = Session()
        except Exception as err:
            print(f'Failed to connect to {server} -> {database}')
            mm = Mailman()
            mm.connect_to_db_failed(server=server,db=database,error_message=err)

     def update_discounts_db(self, data:pd.DataFrame) -> bool:
         create_temp_sql = '''
         CREATE TABLE #tempDiscountDimensions (
            num INT PRIMARY KEY,
            name VARCHAR(60) NOT NULL,
            posPercent FLOAT NULL
         );
         '''
         merge_sql = '''
         MERGE INTO discountDimensions AS target
         USING #tempDiscountDimensions AS source
         ON target.num = source.num
         WHEN MATCHED THEN
            UPDATE SET target.name = source.name,
                       target.posPercent = source.posPercent
         WHEN NOT MATCHED THEN
            INSERT (num, name, posPercent) VALUES (source.num, source.name, source.posPercent);
         '''  
         try:
             self.session.execute(text("""IF OBJECT_ID('tempdb..#tempDiscountDimensions', 'U') IS NOT NULL
                                DROP TABLE #tempDiscountDimensions;"""))
             self.session.execute(text(create_temp_sql))
             data.to_sql('#tempDiscountDimensions',
                    con=self.session,
                    if_exists='append',
                    index=False,
                    dtype={
                        "num": Integer(),
                        "name": String(),
                        "posPercent": Float()
                    })

             self.session.execute(text(merge_sql))
             self.session.execute(text('DROP TABLE #tempDiscountDimensions'))
             self.session.commit()

         except Exception as err:
             self.session.rollback()
             print(f'Failed to update/insert data from discountDimensions table:\n{err}')
             mm = Mailman()
             mm.process_failed(option='db_connection', error=err)
             return None

Gives error:

Traceback (most recent call last):
  File "path\to\folder\main.py", line 81, in <module>
    conn.update_discounts_db(data=data_disc)
  File "path\to\folder\Conn.py", line 82, in update_discounts_db
    data.to_sql('#tempDiscountDimensions',
  File "path\to\folder\_env\Lib\site-packages\pandas\util\_decorators.py", line 333, in wrapper
    return func(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
  File "path\to\folder\_env\Lib\site-packages\pandas\core\generic.py", line 3087, in to_sql
    return sql.to_sql(
          ^^^^^^^^^^^
  File "path\to\folder\_env\Lib\site-packages\pandas\io\sql.py", line 842, in to_sql
    return pandas_sql.to_sql(
          ^^^^^^^^^^^^^^^^^^
  File "path\to\folder\_env\Lib\site-packages\pandas\io\sql.py", line 2839, in to_sql
    raise ValueError(f"{col} ({my_type}) not a string")
ValueError: num (INTEGER) not a string

But when I change "self.session" to the "with self.engine.begin() as connection" it works. However, I lost the ability to roll back in case of an error.

        try:
            with self.engine.begin() as connection:
                connection.execute(text("""IF OBJECT_ID('tempdb..#tempDiscountDimensions', 'U') IS NOT NULL
                                    DROP TABLE #tempDiscountDimensions;"""))
                connection.execute(text(create_temp_sql))
                data.to_sql('#tempDiscountDimensions',
                        con=connection,
                        if_exists='append',
                        index=False,
                        dtype={
                            "num": Integer(),
                            "name": String(),
                            "posPercent": Float()
                        })

                connection.execute(text(merge_sql))
                connection.execute(text('DROP TABLE #tempDiscountDimensions'))

Solution

  • The issue I was having was due to passing an instance of a Session to the "con" in pandas.DataFrame.to_sql and not the issues with the data type as I previously suspected.

    According to pandas' docs DataFrame.to_sql

    parameter "con" should have "sqlalchemy.engine.(Engine or Connection) or sqlite3.Connection"

    Thanks to @GordThompson pointing me in the right direction, I checked through the SQLAlchemy docs on the Session.connection() which it starts transaction and is used to procure a new connection on the session's bound engin, that is exactly what my df.to_sql(con=) needed.