The following URLs are for two DeltaTables ake_original and ake_updates:
from deltalake import DeltaTable, write_deltalake
import pyarrow.dataset as ds
deltaTable = DeltaTable('ake_original')
dataset_update = ds.dataset('ake_updates')
df = dataset_update.to_table()
(
deltaTable.merge(
source=df,
predicate="s.AKE_ID = t.AKE_ID",
source_alias="s",
target_alias="t",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
print(deltaTable.history())
I took a look at _delta_log of original
database. You were not doing inserts. You were doing overwrites so original database is not 59767 records long, but just 9767.
For example, changes made by 0th and by 1st revisions
...0000.json
{"add":{"path":"0-0869ed37-2f6c-4987-a403-f9735a6d1eba-0.parquet", ....
...0001.json
{"add":{"path":"1-b46332c9-3183-40d9-a78c-89dbffbd430e-0.parquet"...
{"remove":{"path":"0-0869ed37-2f6c-4987-a403-f9735a6d1eba-0.parquet",....{"mode":"Overwrite","partitionBy":"[]"...
1st revision removes file created by 0th revision and adds new one but it is just 10000 records
You can check the rest - you remove previous file and create new one.
Solution: when you are populating original
database, you should use mode=append
To recover overwritten records (never done by myself but you can try):
original/_delta_log
folder