with two delta tables(tableA, tableB) as input to the streaming pipeline I want to achieve the following:
I started with the following:
tableA = spark.readstream.format("delta").load(path_to_tableA)
tableB = spark.readstream.format("delta").load(path_to_tableB)
mergedTable = tableA.join(tableB, ...., "inner")
def process_microbatch(df, batch_id):
...transformations on df...
df.write.mode("append").saveAsTable(path_to_tableB)
mergedTable.writeStream.foreachBatch(process_microbatch).start()
How can I make sure, that only the updates of tableA are triggering microbatch processings? It would be of course also important, that the new rows of tableB are recognized in point 2 within the next batch.
If tableB
is loaded only once at the beginning of the stream and not updated thereafter, any changes made to it within a microbatch will not be reflected in subsequent microbatches. To address this, you need to ensure that tableB
is reloaded in each microbatch so that it includes the updates made in the previous microbatch.
So, reload tableB
within the process_microbatch
function.
Here is the code.
from pyspark.sql.functions import expr
# Read tableA and apply watermarking
tableA = spark.readStream.format("delta").load(path_to_tableA)
# Define the processing function
def process_microbatch(df, batch_id):
# Read tableB within the processing function
tableB = spark.read.format("delta").load(path_to_tableB)
# Join tables
mergedTable = df.join(tableB, ...., "inner")
transformed_df = ...
transformed_df.write.mode("append").format("delta").save(path_to_tableB)
streamingQuery = tableA.writeStream.foreachBatch(process_microbatch)
.start()
streamingQuery.awaitTermination()
By reloading tableB
within each microbatch, you ensure that any changes made to it in previous microbatches are considered in subsequent microbatches.