My goal is to upload data in delta format to a AWS S3 Data Lake. I have to mention that I've achieved to upload files (parquet) to this datalake, and also I can write delta format in my local machine, but when I try to write delta format in S3 I got error.
So, my code is the following
import findspark
from pyspark.sql import SparkSession
import pandas as pd
findspark.find()
findspark.init()
import boto3
# Initialize a session using the AWS SDK for Python (boto3)
session = boto3.Session(profile_name='default')
# Get the AWS credentials
credentials = session.get_credentials()
spark = SparkSession.builder.appName('Session2').config('spark.master', 'local[4]') \
.config("spark.hadoop.fs.s3a.access.key", credentials.access_key) \
.config("spark.hadoop.fs.s3a.secret.key", credentials.secret_key) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6,org.apache.hadoop:hadoop-common:3.3.6,io.delta:delta-core_2.12:2.4.0") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.sql.legacy.parquet.int96RebaseModeInWrite","CORRECTED") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
.getOrCreate()
df = ... # some dataframe
df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").option("mergeSchema", "true").save('s3a://bucket/dts/deltatable/')
I have this error:
"name": "Py4JJavaError",
"message": "An error occurred while calling o116.save.\n: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 172) (host.docker.internal executor driver): java.lang.NoSuchMethodError: 'java.lang.Object org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(org.apache.hadoop.fs.statistics.DurationTracker, org.apache.hadoop.util.functional.CallableRaisingIOE)'\r\n\tat org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration(Invoker.java:147)\r\n\tat org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:282)\r\n\tat org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:435)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:284)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:408)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)\r\n\tat org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:404).....
Also, I have noticed that with this configuration I cannot write parquet files to S3 anymore (contrary to what I was saying before), but I confirm that I do can write delta format in my local machine. But if I change my configuration in spark to this I make it work to write parquet in S3:
spark = SparkSession.builder.appName('Session2').config('spark.master', 'local[4]') \
.config("spark.hadoop.fs.s3a.access.key", credentials.access_key) \
.config("spark.hadoop.fs.s3a.secret.key", credentials.secret_key) \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6,org.apache.hadoop:hadoop-common:3.3.6") \
.getOrCreate()
By the way, the error related to this parquet try with delta configuration is this
Py4JJavaError: An error occurred while calling o130.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 177) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
Another thing that can help is that if I check in S3 I do found the parquet folder that was trying to upload, but inside there isn't any parquet file because of the error I mentioned earlier
So I understand that my configuration is wrong, but do not know why.
Also, I'm using spark 3.4.1, Scala 2.12, Delta-Spark 2.4.0
I'm still on getting-start of delta lake.
Anyway, my current environment is as follows.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.4.1
/_/
Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 11.0.20.1
Branch HEAD
Compiled by user centos on 2023-06-19T23:01:01Z
A description of the version specified as a package is provided in the official documentation.
Use the following command to launch a Spark shell with Delta Lake and S3 support (assuming you use Spark 3.2.1 which is pre-built for Hadoop 3.3.1):
bin/spark-shell \
--packages io.delta:delta-core_2.12:2.4.0,org.apache.hadoop:hadoop-aws:3.3.1 \
--conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \
--conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key>
And here is what I've tested,
Delta Lake version | Apache Spark version | hadoop-common | aws-java-sdk ... | Desc |
---|---|---|---|---|
3.1.0 | 3.5.1 | 3.3.4 | 1.12.262 | delta-core --> delta-spark |
2.4.x | 3.4.x | 3.3.4 | 1.12.262 | |
2.3.x | 3.3.x | 3.3.3 | 1.11.1034 |
So, in your case; change package version as below.
Also, I'm using spark 3.4.1, Scala 2.12, delta-core 2.4.0
This will work for you
.config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4,io.delta:delta-core_2.12:2.4.0")
Here's the article that guided me:
Combining Delta Lake With MinIO for Multi-Cloud Data Lakes
Unfortunately, the documentation does not provide detailed information about specific versions. Refer version table above.
Download the jar required for execution to $SPARK_HOME/jars
.(refer table as below)
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.3.4/hadoop-common-3.3.4.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.12.262/aws-java-sdk-dynamodb-1.12.262.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-sts/1.12.262/aws-java-sdk-sts-1.12.262.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.12.262/aws-java-sdk-s3-1.12.262.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.12.262/aws-java-sdk-core-1.12.262.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.262/aws-java-sdk-1.12.262.jar
That's it!!! I hope this work for you.
Also, I've got help from this link too, Pyspark S3 error: java.lang.NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException
We can find matched version from;
at Hadoop 3.4.0, aws library applied 2.x.
Hadoop CommonHADOOP-18073 S3A: Upgrade AWS SDK to V2
Please make sure in my case, I met error and stop it.
Caused by: java.lang.ClassNotFoundException: software.amazon.awssdk.core.exception.SdkException