python-3.xpostgresqlsqlalchemyupsert

Upsert on Conflict with sqlalchemy 2.x and Postgresql


I am using python to query an external api, transform the data and write it to a postgresql database internally.

In that process, I am comparing the result from the api with existing data in the database using pandas and generate a dataframe that has new records and existing records that have changed in one dataframe.

What I want to do is, to hand over the dataframe or dictionary to sqlalchemy and get it processed in a way that:

  1. new records are just appended
  2. existing records are updated

This is how I approached it (I am a newbie in python, so please be patient with my beginner skills...)

def update_absence(year):
    api_result = get_absence(year)
    db_result = get_database_absence(year)
    df = compare_dataframes(api_result, db_result, 'id')
    metadata_obj = MetaData()
    metadata_obj.reflect(bind=engine)
    some_table = Table("tb_absence", metadata_obj, autoload_with=engine)

    for item in df.to_dict('records'):
        insert_stmt = insert(some_table).values(item).on_conflict_do_update(constraint='tb_absence_pkey', set_=item)
        print(insert_stmt.compile())
        with engine.connect() as conn:
            result = conn.execute(insert_stmt)
            print(result.rowcount)

    conn.commit()

the output for the insert_stmt.compile() is the following:

INSERT INTO tb_absence (id, start_date, end_date, half_day, morning, user_id, employee_id, type, extra_vacation, state, substitute_state, workdays, hours, medical_certificate, comments, substitute_user_id, name) VALUES (%(id)s, %(start_date)s, %(end_date)s, %(half_day)s, %(morning)s, %(user_id)s, %(employee_id)s, %(type)s, %(extra_vacation)s, %(state)s, %(substitute_state)s, %(workdays)s, %(hours)s, %(medical_certificate)s, %(comments)s, %(substitute_user_id)s, %(name)s) ON CONFLICT ON CONSTRAINT tb_absence_pkey DO UPDATE SET id = %(param_1)s, start_date = %(param_2)s, end_date = %(param_3)s, half_day = %(param_4)s, morning = %(param_5)s, user_id = %(param_6)s, employee_id = %(param_7)s, type = %(param_8)s, extra_vacation = %(param_9)s, state = %(param_10)s, substitute_state = %(param_11)s, workdays = %(param_12)s, hours = %(param_13)s, medical_certificate = %(param_14)s, comments = %(param_15)s, substitute_user_id = %(param_16)s, name = %(param_17)s

and the rowcount is 1 for each item that I iterate through (the print statement which will vanish for a real log entry once I have understood the approach). However, the database is never updated. I can't really get my head around the conflict_do_update thing and how to handle the connection and engine.

I guess I have some fundamental issues of understanding it and the examples I find on the sqlalchemy tutorial for that part are hard to comprehend for my, as they only give small snippets and fragments of the solution. I would need a complete working example probably. Also reviewing other questions here on SO do did not lead to a good understanding on my side.

All methods for identifying the differences of the datasets are working fine.

I highly appreciate any hint that could help me progress here.


Solution

  • By looping through for item in df.to_dict('records'): you are creating and sending a separate INSERT for each row. For example, with my table …

    some_table = sa.Table(
        "thing",
        sa.MetaData(),
        sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
        sa.Column("txt", sa.String),
    )
    

    … and DataFrame …

    df = pd.DataFrame([(1, "txt_1"), (2, "txt_2")], columns=["id", "txt"])
    

    engine.echo = True shows that your code results in

    INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = %(param_1)s, txt = %(param_2)s
    [no key 0.00090s] {'id': 1, 'txt': 'txt_1', 'param_1': 1, 'param_2': 'txt_1'}
    INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = %(param_1)s, txt = %(param_2)s
    [no key 0.00066s] {'id': 2, 'txt': 'txt_2', 'param_1': 2, 'param_2': 'txt_2'}
    

    We can turn that into an "executemany" using textual SQL

    sql = sa.text("INSERT INTO thing (id, txt) VALUES (:id, :txt) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET txt = :txt")
    with engine.begin() as conn:
        conn.execute(sql, df.to_dict("records"))
    

    which produces

    INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET txt = %(txt)s
    [generated in 0.00086s] [{'id': 1, 'txt': 'txt_1'}, {'id': 2, 'txt': 'txt_2'}]
    

    Or, SQLAlchemy Core can build the statement for us:

    stmt = insert(some_table)
    do_update_stmt = stmt.on_conflict_do_update(
        "thing_pkey", set_={excl.name: excl for excl in stmt.excluded}
    )
    

    which produces

    INSERT INTO thing (id, txt) VALUES (%(id__0)s, %(txt__0)s), (%(id__1)s, %(txt__1)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = excluded.id, txt = excluded.txt
    [no key 0.00017s (insertmanyvalues) 1/1 (unordered)] {'id__0': 1, 'txt__0': 'txt_1', 'id__1': 2, 'txt__1': 'txt_2'}