I am using python to query an external api, transform the data and write it to a postgresql database internally.
In that process, I am comparing the result from the api with existing data in the database using pandas and generate a dataframe that has new records and existing records that have changed in one dataframe.
What I want to do is, to hand over the dataframe or dictionary to sqlalchemy and get it processed in a way that:
This is how I approached it (I am a newbie in python, so please be patient with my beginner skills...)
def update_absence(year):
api_result = get_absence(year)
db_result = get_database_absence(year)
df = compare_dataframes(api_result, db_result, 'id')
metadata_obj = MetaData()
metadata_obj.reflect(bind=engine)
some_table = Table("tb_absence", metadata_obj, autoload_with=engine)
for item in df.to_dict('records'):
insert_stmt = insert(some_table).values(item).on_conflict_do_update(constraint='tb_absence_pkey', set_=item)
print(insert_stmt.compile())
with engine.connect() as conn:
result = conn.execute(insert_stmt)
print(result.rowcount)
conn.commit()
the output for the insert_stmt.compile() is the following:
INSERT INTO tb_absence (id, start_date, end_date, half_day, morning, user_id, employee_id, type, extra_vacation, state, substitute_state, workdays, hours, medical_certificate, comments, substitute_user_id, name) VALUES (%(id)s, %(start_date)s, %(end_date)s, %(half_day)s, %(morning)s, %(user_id)s, %(employee_id)s, %(type)s, %(extra_vacation)s, %(state)s, %(substitute_state)s, %(workdays)s, %(hours)s, %(medical_certificate)s, %(comments)s, %(substitute_user_id)s, %(name)s) ON CONFLICT ON CONSTRAINT tb_absence_pkey DO UPDATE SET id = %(param_1)s, start_date = %(param_2)s, end_date = %(param_3)s, half_day = %(param_4)s, morning = %(param_5)s, user_id = %(param_6)s, employee_id = %(param_7)s, type = %(param_8)s, extra_vacation = %(param_9)s, state = %(param_10)s, substitute_state = %(param_11)s, workdays = %(param_12)s, hours = %(param_13)s, medical_certificate = %(param_14)s, comments = %(param_15)s, substitute_user_id = %(param_16)s, name = %(param_17)s
and the rowcount is 1 for each item that I iterate through (the print statement which will vanish for a real log entry once I have understood the approach). However, the database is never updated. I can't really get my head around the conflict_do_update thing and how to handle the connection and engine.
I guess I have some fundamental issues of understanding it and the examples I find on the sqlalchemy tutorial for that part are hard to comprehend for my, as they only give small snippets and fragments of the solution. I would need a complete working example probably. Also reviewing other questions here on SO do did not lead to a good understanding on my side.
All methods for identifying the differences of the datasets are working fine.
I highly appreciate any hint that could help me progress here.
By looping through for item in df.to_dict('records'):
you are creating and sending a separate INSERT for each row. For example, with my table …
some_table = sa.Table(
"thing",
sa.MetaData(),
sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
sa.Column("txt", sa.String),
)
… and DataFrame …
df = pd.DataFrame([(1, "txt_1"), (2, "txt_2")], columns=["id", "txt"])
… engine.echo = True
shows that your code results in
INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = %(param_1)s, txt = %(param_2)s
[no key 0.00090s] {'id': 1, 'txt': 'txt_1', 'param_1': 1, 'param_2': 'txt_1'}
INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = %(param_1)s, txt = %(param_2)s
[no key 0.00066s] {'id': 2, 'txt': 'txt_2', 'param_1': 2, 'param_2': 'txt_2'}
We can turn that into an "executemany" using textual SQL
sql = sa.text("INSERT INTO thing (id, txt) VALUES (:id, :txt) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET txt = :txt")
with engine.begin() as conn:
conn.execute(sql, df.to_dict("records"))
which produces
INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET txt = %(txt)s
[generated in 0.00086s] [{'id': 1, 'txt': 'txt_1'}, {'id': 2, 'txt': 'txt_2'}]
Or, SQLAlchemy Core can build the statement for us:
stmt = insert(some_table)
do_update_stmt = stmt.on_conflict_do_update(
"thing_pkey", set_={excl.name: excl for excl in stmt.excluded}
)
which produces
INSERT INTO thing (id, txt) VALUES (%(id__0)s, %(txt__0)s), (%(id__1)s, %(txt__1)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = excluded.id, txt = excluded.txt
[no key 0.00017s (insertmanyvalues) 1/1 (unordered)] {'id__0': 1, 'txt__0': 'txt_1', 'id__1': 2, 'txt__1': 'txt_2'}