apache-sparkpysparkdatabricksspark-structured-streaming

Multiple Sinks Processing not persisting in Databricks Community Edition


I am just trying to use Rate with Structured Streaming, so as to write to multiple tables names per MicroBatch. I.e. just refreshing multiple sinks logic in prep for some certification, in pyspark.

No errors, but there is no persistence occuring. A while since I looked; must be something basic.

Coding as follows on Databricks Community Edition, no Hive Catalog. Basic stuff.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit

spark = SparkSession.builder \
    .appName("SimulateKAFKAandMultipleSinks") \
    .getOrCreate()

rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 1) \
    .load()

message_stream = rate_stream.select(
    (rate_stream["value"] + 1000).alias("message_id"),              
    rate_stream["timestamp"].alias("event_time"),                    
    (concat(lit("T"), rate_stream["value"] % 5)).alias("table_id")   
)

def append_to_parquet(df, table_id):
    table_path = f"/mnt/parquet/{table_id}"   

    df.write \
        .format("parquet") \
        .mode("append") \
        .option("path", table_path) \
        .save()

def process_batch(df, batch_id):
    partitioned_df = df.repartition("table_id")
    def process_partition(iterator):
        for partition in iterator:
            first_row_value = df.first()
            table_id_value = first_row_value['table_id']
            print(f"Writing partition for table_id: {table_id_value}") 
            partition_df = partition.filter(col("table_id") == table_id_value)
            append_to_parquet(partition_df, table_id_value)

    partitioned_df.rdd.mapPartitions(process_partition)

query = message_stream.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .option("checkpointLocation", "/mnt/parquet/checkpoints/") \
    .start()

query.awaitTermination()

Update. Oversight my side but educational enough. Can do this with batch KAFKA, not via structured streaming. NB: not strictly speaking correct.


Solution

  • So, the mistake I was made was to forget about RDD's and Structured Streaming. The question's logic works for batch KAFKA.

    But for Structured Streaming using parquet files as sink as opposed to managed tables paradigm, the following will work as one of a number of approaches possible:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, concat, lit
    
    spark = SparkSession.builder \
        .appName("SimulateKAFKAandMultipleSinksParquet") \
        .config("spark.sql.legacy.pathOptionBehavior.enabled", "true") \
        .getOrCreate()
    
    rate_stream = spark.readStream \
        .format("rate") \
        .option("rowsPerSecond", 1) \
        .load()
    
    message_stream = rate_stream.select((rate_stream["value"] + 1000).alias("message_id"),              
                                         rate_stream["timestamp"].alias("event_time"),                    
                                         (concat(lit("path_to"), rate_stream["value"] % 5)).alias("table_id")   
    )
    
    # Append to path for parquet file, where a subsequent table definition can be made
    def append_data_dynamically(df, table_id):
        table_path = f"/GED_SO/parquet/{table_id}"   
        df.write \
          .format("parquet") \
          .mode("append") \
          .option("path", table_path) \
          .save()
    
    # For a given microbatch, a relatively expensive operation involving driver! But that is required.  
    def process_batch(df, batch_id):
        df = df.cache() 
        list_table_ids = df.select("table_id").distinct().collect()
        for unique_table_id in list_table_ids:
           table_id_value = unique_table_id['table_id']
           table_data_df = df.filter(col("table_id") == table_id_value)
           print(f"Writing partition for table_id: {table_id_value}") 
           append_data_dynamically(table_data_df, table_id_value)
    
    query = message_stream.writeStream \
        .foreachBatch(process_batch) \
        .outputMode("append") \
        .option("checkpointLocation", "/checkpoints/") \
        .start()
    
    query.awaitTermination()