pythonmysqlsqlalchemypymysql

Executing update statement with SQLAlchemy does not update rows, but statement works perfectly in when executed in MySQL Workbench


I have the following code:

from sqlalchemy import create_engine

SCHEMA = 'dev_gba'
TABLE = 'dev_l1c_v2'
USER = 'db-user'
PASSWORD = '-'
ENDPOINT = '-.us-east-1.rds.amazonaws.com'
process_start = 'SOME_VAL'
process_end = 'SOME_VAL'
granule_id = 'A006202_20160829T191558'

engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")
connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()        
    cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID  = "{granule_id}"')
    cursor_obj.close()
finally:
    connection.close()

If I select all from the database, then I can see the rows that are not updated. However, if I print the statement like so:

print(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"')

The output is:

UPDATE dev_gba.dev_l1c_v2 SET PROCESS_START_TIME = "SOME_VAL", PROCESS_END_TIME = "SOME_VAL" WHERE dev_gba.dev_l1c_v2.GRANULE_ID = "A006202_20160829T191558"

If I take this and copy and paste it into MySQL workbench, it will execute, and I can see that the rows were updated.

I have disabled safe update in the workbench and have tried adding it before my execute statement as

cursor_obj.execute('SET SQL_SAFE_UPDATES = 0')

but that also doesn't work. Here is the other thing that is confusing, earlier in my code I run the following:

connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()
    cursor_obj.execute(f'CREATE TEMPORARY TABLE {TEMP_TABLE} SELECT {TABLE}.index FROM {SCHEMA}.{TABLE} WHERE IN_PROGRESS = 0 AND PROCESSED = 0 ORDER BY RAND() LIMIT {CPU_COUNT}')
    cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET IN_PROGRESS = 1, INSTANCE_ID = "{INSTANCE_ID}" WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
    cursor_obj.execute(f'SELECT BASE_URL FROM {SCHEMA}.{TABLE} WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
    result = cursor_obj.fetchall()
    cursor_obj.execute(f'DROP TABLE {TEMP_TABLE}')
    cursor_obj.close()
finally:
    connection.close()

The update statement in this code works just fine and there are no issues. I have also tried adding echo=True to my create engine line:

engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}", echo = True)

The output is:

2021-12-31 10:17:09,613 INFO sqlalchemy.engine.Engine SHOW VARIABLES LIKE 'sql_mode'

2021-12-31 10:17:09,616 INFO sqlalchemy.engine.Engine [raw sql] {}

2021-12-31 10:17:09,700 INFO sqlalchemy.engine.Engine SHOW VARIABLES LIKE 'lower_case_table_names'

2021-12-31 10:17:09,701 INFO sqlalchemy.engine.Engine [generated in 0.00143s] {}

2021-12-31 10:17:09,858 INFO sqlalchemy.engine.Engine SELECT DATABASE()

2021-12-31 10:17:09,859 INFO sqlalchemy.engine.Engine [raw sql] {}

Which isn't very useful.

I have also tried:

from sqlalchemy.sql import text
cursor_obj.execute(text(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID  = "{granule_id}"'))

This gives the following error:

TypeError: object of type 'TextClause' has no len()

Not really sure where to go from here.


Solution

  • On review (thanks for making me look again downvoter!), the SQL in the question is well-formed - the problem is the connection's commit method is not called, so the changes are never applied to the database. Nevertheless it remains true that if you are using SQLAlchemy you shouldn't be using raw connections and raw SQL unnecessarily, so the concepts in this answer remain valid in my view.


    Using string formatting to create SQL statements in Python is error-prone and should be avoided if there are better tools available.

    You could run the raw query like this using SQLAlchemy core like this, without having to drop down to the raw connection:

    import sqlalchemy as sa
    
    engine = sa.create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")
    
    
    # Reflect the database table into an object  
    tbl = sa.Table(TABLE, sa.MetaData(), autoload_with=engine)
    # Create an update object
    upd = sa.update(tbl).where(tbl.c.GRANULE_ID == granule_id).values(PROCESS_START_TIME=process_start_time, PROCESS_END_TIME=process_end_time)
    
    # The "begin" context manager will automatically commit on exit
    with engine.begin() as conn:
        conn.execute(upd)
    

    If you need to use raw SQL, you can do it like this (see Using Textual SQL):

    # We need to use string formatting to set the table; SQLAlchemy will automatically qualify it with the schema name.
    stmt = f'UPDATE {TABLE} SET PROCESS_START_TIME = :process_start_time, PROCESS_END_TIME = :process_end_time WHERE {TABLE}.GRANULE_ID  = :granule_id'
    
    values = {
        'process_start_time': process_start_time,
        'process_end_time': process_end_time,
        'granule_id': granule_id,
    }
    
    with engine.begin() as conn:
        conn.execute(sa.text(stmt), values)