pythonapache-sparkamazon-s3pysparkdelta-lake

Write Delta format to Data Lake in AWS S3


My goal is to upload data in delta format to a AWS S3 Data Lake. I have to mention that I've achieved to upload files (parquet) to this datalake, and also I can write delta format in my local machine, but when I try to write delta format in S3 I got error.

So, my code is the following

import findspark
from pyspark.sql import SparkSession
import pandas as pd

findspark.find()
findspark.init()

import boto3

# Initialize a session using the AWS SDK for Python (boto3)
session = boto3.Session(profile_name='default')

# Get the AWS credentials
credentials = session.get_credentials()

spark = SparkSession.builder.appName('Session2').config('spark.master', 'local[4]') \
                            .config("spark.hadoop.fs.s3a.access.key", credentials.access_key) \
                            .config("spark.hadoop.fs.s3a.secret.key", credentials.secret_key) \
                            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6,org.apache.hadoop:hadoop-common:3.3.6,io.delta:delta-core_2.12:2.4.0") \
                            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                            .config("spark.sql.legacy.parquet.int96RebaseModeInWrite","CORRECTED") \
                            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
                            .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
                            .getOrCreate()      

df = ... # some dataframe

df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").option("mergeSchema", "true").save('s3a://bucket/dts/deltatable/')

I have this error:

"name": "Py4JJavaError",
    "message": "An error occurred while calling o116.save.\n: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 172) (host.docker.internal executor driver): java.lang.NoSuchMethodError: 'java.lang.Object org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(org.apache.hadoop.fs.statistics.DurationTracker, org.apache.hadoop.util.functional.CallableRaisingIOE)'\r\n\tat org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration(Invoker.java:147)\r\n\tat org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:282)\r\n\tat org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:435)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:284)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:408)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:404).....

Also, I have noticed that with this configuration I cannot write parquet files to S3 anymore (contrary to what I was saying before), but I confirm that I do can write delta format in my local machine. But if I change my configuration in spark to this I make it work to write parquet in S3:

spark = SparkSession.builder.appName('Session2').config('spark.master', 'local[4]') \
                            .config("spark.hadoop.fs.s3a.access.key", credentials.access_key) \
                            .config("spark.hadoop.fs.s3a.secret.key", credentials.secret_key) \
                            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6,org.apache.hadoop:hadoop-common:3.3.6") \
                            .getOrCreate()

By the way, the error related to this parquet try with delta configuration is this

Py4JJavaError: An error occurred while calling o130.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 177) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
    at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)

Another thing that can help is that if I check in S3 I do found the parquet folder that was trying to upload, but inside there isn't any parquet file because of the error I mentioned earlier

enter image description here

So I understand that my configuration is wrong, but do not know why.

Also, I'm using spark 3.4.1, Scala 2.12, Delta-Spark 2.4.0


Solution

  • I'm still on getting-start of delta lake.
    Anyway, my current environment is as follows.

    Welcome to
           ____ __
          / __/__ ___ _____/ /__
         _\ \/ _ \/ _ `/ __/ '_/
        /___/ .__/\_,_/_/ /_/\_\ version 3.4.1
           /_/
    
    Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 11.0.20.1
    Branch HEAD
    Compiled by user centos on 2023-06-19T23:01:01Z
    

    A description of the version specified as a package is provided in the official documentation.

    Use the following command to launch a Spark shell with Delta Lake and S3 support (assuming you use Spark 3.2.1 which is pre-built for Hadoop 3.3.1):

    bin/spark-shell \
      --packages io.delta:delta-core_2.12:2.4.0,org.apache.hadoop:hadoop-aws:3.3.1 \
      --conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \
      --conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key>
    

    And here is what I've tested,

    Delta Lake version Apache Spark version hadoop-common aws-java-sdk ... Desc
    3.1.0 3.5.1 3.3.4 1.12.262 delta-core --> delta-spark
    2.4.x 3.4.x 3.3.4 1.12.262
    2.3.x 3.3.x 3.3.3 1.11.1034

    So, in your case; change package version as below.

    Also, I'm using spark 3.4.1, Scala 2.12, delta-core 2.4.0

    This will work for you

    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4,io.delta:delta-core_2.12:2.4.0") 
    

    Alternatives

    Here's the article that guided me:
    Combining Delta Lake With MinIO for Multi-Cloud Data Lakes

    Unfortunately, the documentation does not provide detailed information about specific versions. Refer version table above.

    That's it!!! I hope this work for you.

    Also, I've got help from this link too, Pyspark S3 error: java.lang.NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException


    Update

    We can find matched version from;

    at Hadoop 3.4.0, aws library applied 2.x.
    Hadoop CommonHADOOP-18073 S3A: Upgrade AWS SDK to V2

    Please make sure in my case, I met error and stop it.

    Caused by: java.lang.ClassNotFoundException: software.amazon.awssdk.core.exception.SdkException