pythonapache-sparkpysparkazure-synapseazure-notebooks

Save Spark dataframe to a dynamic Path in ADLS using Synapse Notebook


I am trying to use a Synapse Notebook using Pyspark to read a bunch of parquet files and reprocess them into different folder structure "YYYY/MM/YYYY-MM-DD.parquet based on the created_date of the row.

When I just use a literal path, everything works fine:

df1.write.format("parquet").mode("overwrite").save("<file path to ADLS>")

I've tried passing in the path dynamically various ways, and I always seem to get the same error.

Added column which contains the path:

df1.write.format("parquet").mode("overwrite").save(col("FilePath"))

Tried to concatenate the string

df1.write.format("parquet").mode("overwrite").save(concat("<file path to ADLS>",year("created_date")))

Error: TypeError: Column is not iterable

If I try to concatenate multiple strings, it works, but that doesn't make it dynamic.

df1.write.format("parquet").mode("overwrite").save("<file path to ADLS>" + "test")

I was able to successfully achieve this using ADF DataFlow. I was trying to do this via notebook so I can see the performance difference between the 2.


Solution

  • You cannot save a dataframe providing multiple output path. But what you can do is partition them and save it or create dataframe for each distinct path and save it.

    Below are the steps.

    First, creating partition and saving.

    df2 = df.withColumn("year", year("created_date")) \
        .withColumn("month", month("created_date")) \
        .withColumn("day", dayofmonth("created_date")) \
        .withColumn("file_name",concat(col("year"),lit("-"),col("month"),lit("-"),col("day")))
        
    df2.write.format("parquet").mode("overwrite").partitionBy("year","month","file_name").save(output_path)
    

    You will get files like this.

    enter image description here

    Or

    By creating dataframe to each distinct path and save them.

    import os
    df3 = df2.withColumn("path",concat(col("year"),lit("/"),col("month"),lit("/"),col("file_name"),lit(".parquet")))
    
    for pt in df3.select("path").distinct().collect():
        sub_df = df3.filter(df3.path==pt['path'])
        sub_path = os.path.join("adlsmnt2",pt['path'])
        sub_df.write.format("parquet").mode("overwrite").save(sub_path)
    

    Output :

    enter image description here