pysparkazure-synapsedelta-lake

Efficiently updating a single column value for many rows in MS Fabric / pyspark / delta


I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).

Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
This is not hard to do, but I am looking for the most efficient option.

Loading the new data in df_update and the existing data in df_existing, I have tried two ways of doing this:

-- 1 -- Using pyspark data frames:

I can solve the task with an outer join like

df_new = df_existing\
    .withColumnRenamed('ts', 'ts_old')\
    .join(df_update, on=all_columns_but_the_timestamp, how='outer')
return df_new\
    .withColumn('ts', coalesce(df_new['ts'], df_new['ts_old']))\
    .drop('ts_old')

Unfortunately, this requires me to replace the whole table on disk. That's slow and seems to upset OneLake a bit (seeing the updated data in a query takes additional time). Therefore I tried:

-- 2 -- Using delta lake update

By using

df_new = df_update.exceptAll(df_existing.select(all_columns_but_the_timestamp))
df_duplicates = df_ingest.exceptAll(df_new)

I can get the new and the revisited data.

for row in df_duplicates.collect():
    table.update(
            ' AND '.join([f'{k} = "{v}"' for k, v in row.asDict().items()]),
            {'ts': lit(new_timestamp).cast(TimestampType())})

is a woefully slow way to do the updates. df_new can just be appended to the table afterwards.

I have looked for

-- 3 -- Delta lake update in bulk

Somehow selecting all affected rows in one go and update the value.

table.update(
    some_very_neat_condition,
    {'ts': lit(new_timestamp).cast(TimestampType())})

Since I don't have reliable IDs, I don't know how to do that, however.

Or is there another option I'm missing?


Solution

  • Q: I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).

    Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.

    If I understand correctly, you are trying to merge i.e insert or update

    use MERGE INTO whenever possible... even traditional databases has the below sql equivalent

    from delta.tables import DeltaTable
    from pyspark.sql.functions import current_timestamp
    
     
    delta_table = DeltaTable.forPath(spark, "your tablehere..")
    
     
    delta_table.alias("existing").merge(
        df_update.alias("updates"),
        " AND ".join([f"existing.{col} = updates.{col}" for col in all_columns_but_the_timestamp])
    ).whenMatchedUpdate(set={
        "ts": "current_timestamp()"  
    }).whenNotMatchedInsert(values={
        **{col: f"updates.{col}" for col in all_columns_but_the_timestamp},
        "ts": "current_timestamp()"  
    }).execute()