scalaazure-databricksdelta-lake

Append the "_commit_timestamp" Column to the Latest Data Version When Reading from a DeltaTable


I have data in an delta lake WITHOUT a timestamp on each row to determine when that row was added/modified, but I only need rows that were created/modified after a specified date/time.

I want the latest version of the data from the delta lake but want to append "_commit_timestamp" from the change data feed to the dataframe read from the delta lake so that I can select only the data written after specified date/time, instead of the whole data set.

The change data feed returns all modifications to a row, i.e. insert/deletion/update so there can be multiple rows for the same row/data in each version.

Is there a way of getting just the latest version of the each row with the "_commit_timestamp" appended?


Solution

  • Alter the delta table initially and implement merge operation later.

    1. Altering will help to add the column to existing table

    2. Merge the results and get updates of time stamp from new version.

      from pyspark.sql.functions import lit spark.read.format("delta").load('yourpath')
      .withColumn("Recovered", lit(''))
      .write
      .format("delta")
      .mode("overwrite")
      .option("overwriteSchema", "true")
      .save('yourpath')

    While creating the table use the below method of using query

    CREATE TABLE tablename (variables) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    

    delta.enableChangeDataFeed = true helps to alter the data of the table after creation in delta table

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

    for onetime property enabling, use the above command.

    To use in python format, implement in below way.

    # version as ints or longs
    spark.read.format("delta") \
      .option("readChangeFeed", "true") \
      .option("startingVersion", 0) \
      .option("endingVersion", 10) \
      .table("myDeltaTable")
    
    # timestamps as formatted timestamp
    spark.read.format("delta") \
      .option("readChangeFeed", "true") \
      .option("startingTimestamp", '2021-04-21 05:45:46') \
      .option("endingTimestamp", '2021-05-21 12:00:00') \
      .table("myDeltaTable")
    
    # providing only the startingVersion/timestamp
    spark.read.format("delta") \
      .option("readChangeFeed", "true") \
      .option("startingVersion", 0) \
      .table("myDeltaTable")
    
    # path based tables
    spark.read.format("delta") \
      .option("readChangeFeed", "true") \
      .option("startingTimestamp", '2021-04-21 05:45:46') \
      .load("pathToMyDeltaTable")
    

    for other language patterns kindly check the below link.

    https://learn.microsoft.com/en-us/azure/databricks/delta/delta-change-data-feed