apache-sparkpysparkspark-streamingazure-service-fabricdelta-live-tables

How to write in parallel in spark structure streaming?


I have multiple DataFramess and finally, I am writing those DF in delta tables.

There are 5 DataFrames I need to write into 5 delta tables in parallel. Can we do this in one notebook?

I am writing output like this:

query_a_b = metadata_df1.writeStream \
    .format("delta") \
    .option("checkpointLocation", "Files/checkpoint/event_hub_df") \
    .outputMode("append") \
    .start("Tables/metadata_df1")
 
query_a_c = state_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "Files/checkpoint1/event_hub_df") \
    .outputMode("append") \
    .start("Tables/state_df")
 
query_a_d = cols.writeStream \
    .format("delta") \
    .option("checkpointLocation", "Files/checkpoint2/event_hub_df") \
    .outputMode("append") \
    .start("Tables/cols")
 
query_a_e = metadata_df2.writeStream \
    .format("delta") \
    .option("checkpointLocation", "Files/checkpoint3/event_hub_df") \
    .outputMode("append") \
    .start("Tables/metadata_df2")
 
query_a_f = metadata_df3.writeStream \
    .format("delta") \
    .option("checkpointLocation", "Files/checkpoint4/event_hub_df") \
    .outputMode("append") \
    .start("Tables/metadata_df3")

Likewise, I have 30 DataFrames that should be written in parallel with 30 delta tables.

Will this be written in parallel?


Solution

  • You can start any number of queries in a single SparkSession. They will all be running concurrently sharing the cluster resources.

    You can use sparkSession.streams() to get the StreamingQueryManager (Scala/Java/Python docs) that can be used to manage the currently active queries.

    spark = ...  # spark session
    
    spark.streams.active  # get the list of currently active streaming queries
    
    spark.streams.get(id)  # get a query object by its unique id
    
    spark.streams.awaitAnyTermination()  # block until any one of them terminates
    

    You can use a similar approach if you have 30 DataFrames that you need to write to 30 Delta tables in parallel. Just create 30 separate writeStream queries, one for each DataFrame.