pysparkspark-streamingazure-databricks

Spark streaming with delta tables - update input table in microbatch


with two delta tables(tableA, tableB) as input to the streaming pipeline I want to achieve the following:

  1. Processing starts when new rows in tableA appear(and not when tableB is updated)
  2. mergedTable = tableA.join(tableB, ...., "inner")
  3. do some transformations on mergedTable
  4. Based on the transformations append new rows to tableB

I started with the following:

tableA = spark.readstream.format("delta").load(path_to_tableA)
tableB = spark.readstream.format("delta").load(path_to_tableB)
mergedTable = tableA.join(tableB, ...., "inner")

def process_microbatch(df, batch_id):
    ...transformations on df...
    df.write.mode("append").saveAsTable(path_to_tableB)

mergedTable.writeStream.foreachBatch(process_microbatch).start()

How can I make sure, that only the updates of tableA are triggering microbatch processings? It would be of course also important, that the new rows of tableB are recognized in point 2 within the next batch.


Solution

  • If tableB is loaded only once at the beginning of the stream and not updated thereafter, any changes made to it within a microbatch will not be reflected in subsequent microbatches. To address this, you need to ensure that tableB is reloaded in each microbatch so that it includes the updates made in the previous microbatch.

    So, reload tableB within the process_microbatch function.

    Here is the code.

    from pyspark.sql.functions import expr
    
    # Read tableA and apply watermarking
    tableA = spark.readStream.format("delta").load(path_to_tableA)
    
    # Define the processing function
    def process_microbatch(df, batch_id):
    
        # Read tableB within the processing function
        tableB = spark.read.format("delta").load(path_to_tableB)
        # Join tables
        mergedTable = df.join(tableB, ...., "inner")
        
        transformed_df = ...
        transformed_df.write.mode("append").format("delta").save(path_to_tableB)
    
    
    streamingQuery = tableA.writeStream.foreachBatch(process_microbatch)
        .start()
    streamingQuery.awaitTermination()
    

    By reloading tableB within each microbatch, you ensure that any changes made to it in previous microbatches are considered in subsequent microbatches.