I am using Structured streaming in Databricks to load batch files into a UC table. It is working however, if the foreachBatch does not finish within 60 seconds, it is creating the following error:
Force terminating query xxxxxxxxxxxxx due to not receiving any updates in 60 seconds. Spark Session ID is yyyyyyyyyyy
Timedout query xxxxxxxxxxxxx terminated
Here is the full code:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType # Import necessary types
from delta.tables import DeltaTable
from pyspark.sql.functions import lit
if debug:
print('Streaming enabled: read')
# Read the data into stream
landing_df = spark.readStream \
.format("cloudFiles") \
.option("cloudfiles.format", "parquet") \
.option("cloudFiles.schemaLocation", checkpoint_path) \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.option("mergeSchema", "true") \
.load(source_path)
landing_df = dropMetaColumns(landing_df)
def merge_batches(batchDF, batch_id):
batchDF = get_latest_records(batchDF)
# Check if the Delta table exists
if not table_exists:
print('Initial Load')
# Add metadata columns
batchDF = add_metadata_bulkload(batchDF)
#batchDF = add_metadata_bulkload(batchDF)
dfw = batchDF.write \
.mode('overwrite') \
.option('overwriteSchema', 'true') \
.saveAsTable(destination_table)
else:
print('Merge Load')
# If the Delta table exists, merge the data
deltaTable = DeltaTable.forName(spark, destination_table)
merge_condition = get_merge_condition(primary_key)
# Add any columns that dont exist in delta that exist in source
add_missing_columns(destination_table, batchDF)
# Get columns in target delta table
existing_columns = [field.name for field in spark.table(destination_table).schema.fields]
# upsert_columns = { f"target.`{c.name}`" : f"source.`{c.name}`" for c in batchDF.schema if c.name in existing_columns and c.name }
# Create update statement to update the target table
update_columns = get_upsert_condition(batchDF, loadAction.UPDATE.name, existing_columns)
insert_columns = get_upsert_condition(batchDF, loadAction.INSERT.name, existing_columns)
# Perform the merge operation for each batch
deltaTable.alias("target").merge(
batchDF.alias("source"),
merge_condition
) \
.whenMatchedUpdate(set=update_columns) \
.whenNotMatchedInsert(values=insert_columns) \
.execute()
# Stream data in micro-batches, merging it into the Delta table
landing_df.writeStream \
.foreachBatch(merge_batches) \
.outputMode("update") \
.option("checkpointLocation", checkpoint_path) \
.trigger(availableNow=True) \
.option("mergeSchema", "true") \
.start() \
.awaitTermination()
However it appears as though even though it does error, the data is being loaded. I am not sure whether it continues post the error, and as the session is still running it continues.
I have tried adding a timeout value to awaitTermination()
however it appears as though that doesnt make any difference.
This was a databricks infrastructure bug that has now been resolved. It did not require any code or configuration change.