I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).
Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
This is not hard to do, but I am looking for the most efficient option.
Loading the new data in df_update
and the existing data in df_existing
, I have tried two ways of doing this:
-- 1 -- Using pyspark data frames:
I can solve the task with an outer join like
df_new = df_existing\
.withColumnRenamed('ts', 'ts_old')\
.join(df_update, on=all_columns_but_the_timestamp, how='outer')
return df_new\
.withColumn('ts', coalesce(df_new['ts'], df_new['ts_old']))\
.drop('ts_old')
Unfortunately, this requires me to replace the whole table on disk. That's slow and seems to upset OneLake a bit (seeing the updated data in a query takes additional time). Therefore I tried:
-- 2 -- Using delta lake update
By using
df_new = df_update.exceptAll(df_existing.select(all_columns_but_the_timestamp))
df_duplicates = df_ingest.exceptAll(df_new)
I can get the new and the revisited data.
for row in df_duplicates.collect():
table.update(
' AND '.join([f'{k} = "{v}"' for k, v in row.asDict().items()]),
{'ts': lit(new_timestamp).cast(TimestampType())})
is a woefully slow way to do the updates. df_new
can just be appended to the table afterwards.
I have looked for
-- 3 -- Delta lake update in bulk
Somehow selecting all affected rows in one go and update the value.
table.update(
some_very_neat_condition,
{'ts': lit(new_timestamp).cast(TimestampType())})
Since I don't have reliable IDs, I don't know how to do that, however.
Or is there another option I'm missing?
Q: I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).
Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
If I understand correctly, you are trying to merge i.e insert or update
use MERGE INTO whenever possible... even traditional databases has the below sql equivalent
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
delta_table = DeltaTable.forPath(spark, "your tablehere..")
delta_table.alias("existing").merge(
df_update.alias("updates"),
" AND ".join([f"existing.{col} = updates.{col}" for col in all_columns_but_the_timestamp])
).whenMatchedUpdate(set={
"ts": "current_timestamp()"
}).whenNotMatchedInsert(values={
**{col: f"updates.{col}" for col in all_columns_but_the_timestamp},
"ts": "current_timestamp()"
}).execute()