apache-sparkdatabricksazure-databricksspark-structured-streaming

Force terminating query xxxxxx due to not receiving any updates in 60 seconds


I am using Structured streaming in Databricks to load batch files into a UC table. It is working however, if the foreachBatch does not finish within 60 seconds, it is creating the following error:

Force terminating query xxxxxxxxxxxxx due to not receiving any updates in 60 seconds. Spark Session ID is yyyyyyyyyyy
Timedout query xxxxxxxxxxxxx terminated

Here is the full code:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType  # Import necessary types

from delta.tables import DeltaTable
from pyspark.sql.functions import lit

    if debug:
        print('Streaming enabled: read')
    
    # Read the data into stream
    landing_df = spark.readStream \
        .format("cloudFiles") \
        .option("cloudfiles.format", "parquet") \
        .option("cloudFiles.schemaLocation", checkpoint_path) \
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
        .option("mergeSchema", "true") \
        .load(source_path)
    
    landing_df = dropMetaColumns(landing_df)
    
    def merge_batches(batchDF, batch_id):        
        batchDF = get_latest_records(batchDF)
    
        # Check if the Delta table exists
        if not table_exists:
            print('Initial Load')
    
            # Add metadata columns
            batchDF = add_metadata_bulkload(batchDF)
    
            #batchDF = add_metadata_bulkload(batchDF)
            dfw = batchDF.write \
                .mode('overwrite') \
                .option('overwriteSchema', 'true') \
                .saveAsTable(destination_table)
        else:           
    
            print('Merge Load')
            # If the Delta table exists, merge the data
            deltaTable = DeltaTable.forName(spark, destination_table)
            merge_condition = get_merge_condition(primary_key)
    
            # Add any columns that dont exist in delta that exist in source
            add_missing_columns(destination_table, batchDF)     
    
            # Get columns in target delta table
            existing_columns = [field.name for field in spark.table(destination_table).schema.fields] 
    
            # upsert_columns = { f"target.`{c.name}`" : f"source.`{c.name}`" for c in batchDF.schema if c.name in existing_columns and c.name }
            
            # Create update statement to update the target table
            update_columns = get_upsert_condition(batchDF, loadAction.UPDATE.name, existing_columns)
            insert_columns = get_upsert_condition(batchDF, loadAction.INSERT.name, existing_columns)
    
            # Perform the merge operation for each batch
            deltaTable.alias("target").merge(
                batchDF.alias("source"),
                merge_condition  
            ) \
            .whenMatchedUpdate(set=update_columns) \
            .whenNotMatchedInsert(values=insert_columns) \
            .execute()
    
    # Stream data in micro-batches, merging it into the Delta table
    landing_df.writeStream \
        .foreachBatch(merge_batches) \
        .outputMode("update") \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(availableNow=True) \
        .option("mergeSchema", "true") \
        .start() \
        .awaitTermination()

However it appears as though even though it does error, the data is being loaded. I am not sure whether it continues post the error, and as the session is still running it continues.

I have tried adding a timeout value to awaitTermination() however it appears as though that doesnt make any difference.


Solution

  • This was a databricks infrastructure bug that has now been resolved. It did not require any code or configuration change.