apache-sparkpysparkdatabricksspark-structured-streaming

Multiple Sinks Processing not persisting in Databricks Community Edition


I am just trying to use Rate with Structured Streaming, so as to write to multiple tables names per MicroBatch. I.e. just refreshing multiple sinks logic in prep for some certification, in pyspark.

No errors, but there is no persistence occuring. A while since I looked; must be something basic.

Coding as follows on Databricks Community Edition, no Hive Catalog. Basic stuff.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit

spark = SparkSession.builder \
    .appName("SimulateKAFKAandMultipleSinks") \
    .getOrCreate()

rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 1) \
    .load()

message_stream = rate_stream.select(
    (rate_stream["value"] + 1000).alias("message_id"),              
    rate_stream["timestamp"].alias("event_time"),                    
    (concat(lit("T"), rate_stream["value"] % 5)).alias("table_id")   
)

def append_to_parquet(df, table_id):
    table_path = f"/mnt/parquet/{table_id}"   

    df.write \
        .format("parquet") \
        .mode("append") \
        .option("path", table_path) \
        .save()

def process_batch(df, batch_id):
    partitioned_df = df.repartition("table_id")
    def process_partition(iterator):
        for partition in iterator:
            first_row_value = df.first()
            table_id_value = first_row_value['table_id']
            print(f"Writing partition for table_id: {table_id_value}") 
            partition_df = partition.filter(col("table_id") == table_id_value)
            append_to_parquet(partition_df, table_id_value)

    partitioned_df.rdd.mapPartitions(process_partition)

query = message_stream.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .option("checkpointLocation", "/mnt/parquet/checkpoints/") \
    .start()

query.awaitTermination()

Update. Oversight my side but educational enough. Can do this with batch KAFKA, not via structured streaming.


Solution

  • The issue with your code not persisting data likely stems from a couple of fundamental issues with how you're using Structured Streaming in Spark.

    try this one

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, concat, lit
    
    # Create Spark session
    spark = SparkSession.builder \
        .appName("SimulateKAFKAandMultipleSinks") \
        .getOrCreate()
    
    # Create a streaming DataFrame
    rate_stream = spark.readStream \
        .format("rate") \
        .option("rowsPerSecond", 1) \
        .load()
    
    # Transform the data
    message_stream = rate_stream.select(
        (rate_stream["value"] + 1000).alias("message_id"),
        rate_stream["timestamp"].alias("event_time"),
        concat(lit("T"), (rate_stream["value"] % 5).cast("string")).alias("table_id")
    )
    
    # Write to multiple sinks based on table_id
    def write_to_multiple_sinks(batch_df, batch_id):
        unique_table_ids = batch_df.select("table_id").distinct().collect()
        for row in unique_table_ids:
            table_id = row["table_id"]
            table_path = f"/mnt/parquet/{table_id}"  # Replace with actual storage path
            table_df = batch_df.filter(col("table_id") == table_id)
            table_df.write.mode("append").parquet(table_path)
    
    query = message_stream.writeStream \
        .foreachBatch(write_to_multiple_sinks) \
        .outputMode("append") \
        .option("checkpointLocation", "/mnt/parquet/checkpoints/") \
        .start()
    
    query.awaitTermination()