azurepysparkdatabricksazure-databricksdatabricks-sql

in databricks code taking longer time load data data frame into ssms table


df_CorpBond= spark.read.format("parquet").option("header", "true").load(f"/mnt/{container_name}/raw_data/dfl.corporate.parquet")
df_CorpBond.repartition(100).write\
    .format("jdbc")\
    .option("url", url_connector)\
    .option("dbtable", "MarkIt_CorpBonds")\
    .option("user", user)\
    .option("password", pwd)\
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
    .option("numPartitions", 100)\
    .option("batchsize", 100000)\
    .mode("overwrite")\
    .save()

this is my code reading blob files and loadded into ssms jobs was taking alomosr 2 1/2 hours data size is 2.3 gb

even i tried with repartition values as 20 still jobs taking longer time some times first job completed with in 2 min next immedently new job was started with two stages first satge contian 23 tasks next stage contains 20(based on partiton number) this partion stage take longer time to complete the task


Solution

  • Repartitioning high number of partitions will cause Issues related to performance. And if the target database cannot handle parallel inserts effectively it will lead to performance issues.

    Consider converting your Parquet data to Delta Lake format. Delta Lake provides optimizations and ACID transaction:

    1st Approach:

    Read the Parquet file

    df_CorpBond = spark.read.format("parquet").option("header", "true").load(f"/mnt/{container_name}/raw_data/dfl.corporate.parquet")
    

    Write the DataFrame to a Delta table

    df_CorpBond.write.format("delta").mode("overwrite").save(f"/mnt/{container_name}/delta/MarkIt_CorpBonds")
    

    Know more about the Delta vs. Parquet: A Deep Dive into Big Data Storage Solutions

    2nd Aprroach:

    df_CorpBond.coalesce(10).write \
        .format("jdbc") \
        .option("url", jdbcUrl) \
        .option("dbtable", table_name) \
        .option("user", connectionProperties["user"]) \
        .option("password", connectionProperties["password"]) \
        .option("driver", connectionProperties["driver"]) \
        .option("batchsize", 10000) \
        .mode("overwrite") \
        .save()
    

    In the above code I have used coalesce instead of repartition to reduce the number of partitions, as coalesce avoids a full shuffle.

    Know more about PySpark Repartition() vs Coalesce()