pysparkdatabricksdelta-lake

DeltaLake/DeltaTable merge operation inserts/duplicates matched rows not updating them


The following URLs are for two DeltaTables ake_original and ake_updates:

from deltalake import DeltaTable, write_deltalake
import pyarrow.dataset as ds

deltaTable = DeltaTable('ake_original')
dataset_update = ds.dataset('ake_updates')
df = dataset_update.to_table()

(
    deltaTable.merge(
        source=df,
        predicate="s.AKE_ID = t.AKE_ID", 
        source_alias="s",
        target_alias="t",
    )
    .when_matched_update_all()
    .when_not_matched_insert_all()
    .execute()
)

print(deltaTable.history())

Solution

  • I took a look at _delta_log of original database. You were not doing inserts. You were doing overwrites so original database is not 59767 records long, but just 9767.

    For example, changes made by 0th and by 1st revisions

    ...0000.json
    {"add":{"path":"0-0869ed37-2f6c-4987-a403-f9735a6d1eba-0.parquet", ....
    
    ...0001.json
    
    {"add":{"path":"1-b46332c9-3183-40d9-a78c-89dbffbd430e-0.parquet"...
    {"remove":{"path":"0-0869ed37-2f6c-4987-a403-f9735a6d1eba-0.parquet",....{"mode":"Overwrite","partitionBy":"[]"...
    

    1st revision removes file created by 0th revision and adds new one but it is just 10000 records

    You can check the rest - you remove previous file and create new one.

    Solution: when you are populating original database, you should use mode=append


    To recover overwritten records (never done by myself but you can try):