apache-sparkapache-spark-sqlparquetazure-synapse

Spark Large single Parquet file to Delta Failure with Spark SQL


Cluster details

Parquet file details

I'm trying to write out only a subset of the columns and all rows to a new delta table using Spark SQL.

Although this is a wide file, I'm trying to write out only a subset of the columns and all rows to a new delta table using Spark SQL.

I've noticed that something that should be simple like a simple count(*) causes failure. I've tried to narrow this down as much as possible into the smallest increment I could think of and it fails. For example, the following code writing 10 rows of the smallest column, fails:

CREATE TABLE db.my_table
USING DELTA
AS
SELECT  my_teeny_tiny_column
FROM parquet.`abfss://mycontainer@mystorageaccount.dfs.core.windows.net/files/myfile.parquet`
LIMIT 10

As I mentioned I don't get any kind of log that I can read through, all I get back from the dead executor is Container from a bad node: container_1722263226188_0007_01_000003 on host: vm-######. Exit status: 143. Diagnostics: [2024-07-29 20:43:20.686]Container killed on request

I've also tried to write the same file from parquet to parquet but get the same behavior.

I have noticed that if run just the query portion of this, I can display it in the notebook but for some reason, trying to write it doesn't work.

Has anyone had a problem like this that they were able to solve?


Solution

  • The error message

    Container from a bad node and Exit status: 143

    This issue is realted to memory-related problem with Apache-Spark, when handling big data.

    To resolve memory issues in Spark effectively and understand the underlying processes. The memoryOverhead in Spark tuning three has key parameters— Cores, Memory, and MemoryOverhead—can significantly improve the chances of your job succeeding.

    MemoryOverhead allows your container (whether it is the driver or executor(s)) to run until its memory usage reaches the MemoryOverhead limit.

    As you have mentioned 5 executor nodes each with 16 cores and 112GB RAM, you can configure your Spark settings like below:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("Optimized_Spark_dilip")
        .config("spark.sql.shuffle.partitions", "4") \
        .config("spark.executor.memory", "20G") \
        .config("spark.executor.cores", "4")
        .config("spark.driver.memory", "4G")    \
        .config("spark.executor.memoryOverhead", "2G") \
        .config("spark.executor.instances", "20")
        .config("spark.default.parallelism", "128") \
        .getOrCreate()
    df = spark.read.parquet("<Path/to/sample.parquet")
    df_subset = df.select("my_teeny_tiny_column").limit(10)
    df_subset.write.format("delta").saveAsTable("db.my_table")
    

    Reference: Spark – Container exited with a non-zero exit code 143

    Here is the below configuration i have tried

    For spark.executor.memory

    100GB4 executors per node=25GB per executor\frac{100 \text{GB}}{4 \text{ executors per node}} = 25 \text{GB per executor}
     4 executors per node100GB​=25GB per executor
    

    For spark.executor.cores Allocate 4 cores per executor.

    spark.executor.memoryOverhead
    

    For spark.executor.instances Total number of executors:

    5 nodes×4 executors per node=20 executors5 \text{ nodes} \times 4 \text{ executors per node} = 20 \text{ executors}5 nodes×4 executors per node=20 executors