I have data in an delta lake WITHOUT a timestamp on each row to determine when that row was added/modified, but I only need rows that were created/modified after a specified date/time.
I want the latest version of the data from the delta lake but want to append "_commit_timestamp" from the change data feed to the dataframe read from the delta lake so that I can select only the data written after specified date/time, instead of the whole data set.
The change data feed returns all modifications to a row, i.e. insert/deletion/update so there can be multiple rows for the same row/data in each version.
Is there a way of getting just the latest version of the each row with the "_commit_timestamp" appended?
Alter the delta table initially and implement merge operation later.
Altering will help to add the column to existing table
Merge the results and get updates of time stamp from new version.
from pyspark.sql.functions import lit
spark.read.format("delta").load('yourpath')
.withColumn("Recovered", lit(''))
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.save('yourpath')
While creating the table use the below method of using query
CREATE TABLE tablename (variables) TBLPROPERTIES (delta.enableChangeDataFeed = true)
delta.enableChangeDataFeed = true
helps to alter the data of the table after creation in delta table
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
for onetime property enabling, use the above command.
To use in python format, implement in below way.
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
for other language patterns kindly check the below link.
https://learn.microsoft.com/en-us/azure/databricks/delta/delta-change-data-feed