I'm trying to write Parquet data to AWS S3 directory with Apache Spark. I use my local machine on Windows 10 without having Spark and Hadoop installed, but rather added them as SBT dependency (Hadoop 3.2.1, Spark 2.4.5). My SBT is below:
scalaVersion := "2.11.11"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.4.5",
"org.apache.spark" %% "spark-hadoop-cloud" % "2.3.2.3.1.0.6-1",
"org.apache.hadoop" % "hadoop-client" % "3.2.1",
"org.apache.hadoop" % "hadoop-common" % "3.2.1",
"org.apache.hadoop" % "hadoop-aws" % "3.2.1",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.11.704"
)
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.core" % "jackson-core" % "2.11.0",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.11.0",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.0"
)
resolvers ++= Seq(
"apache" at "https://repo.maven.apache.org/maven2",
"hortonworks" at "https://repo.hortonworks.com/content/repositories/releases/",
)
I use S3A Staging Directory Committer as described in Hadoop and Cloudera documentation. I'm also aware of these two questions on StackOverflow and used them for proper configuration:
I have added all required (as of my understanging) configurations including latest two specific for Parquet:
val spark = SparkSession.builder()
.appName("test-run-s3a-commiters")
.master("local[*]")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")
.config("spark.hadoop.fs.s3a.connection.maximum", "100")
.config("spark.hadoop.fs.s3a.committer.name", "directory")
.config("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
.config("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append")
.config("spark.hadoop.fs.s3a.committer.staging.unique-filenames", "true")
.config("spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads", "true")
.config("spark.hadoop.fs.s3a.buffer.dir", "tmp/")
.config("spark.hadoop.fs.s3a.committer.staging.tmp.path", "hdfs_tmp/")
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
.config("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
.config("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
.getOrCreate()
spark.sparkContext.setLogLevel("info")
From the logs I can see that StagingCommitter is actually applied (also I can see intermediate data in my local filesystem under specified paths and no _temporary directory in S3 during execution like it would be with default FileOutputCommitter).
Then I'm running simple code to write test data to S3 bucket:
import spark.implicits._
val sourceDF = spark
.range(0, 10000)
.map(id => {
Thread.sleep(10)
id
})
sourceDF
.write
.format("parquet")
.save("s3a://my/test/bucket/")
(I use Thread.sleep
to simulate some processing and have little time to check the intermediate content of my local temp directory and S3 bucket)
However, I get an java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat
error during commit task attempt.
Below is the piece of logs (reduced to 1 executor) and error stack trace.
20/05/09 15:13:18 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 15000
20/05/09 15:13:18 INFO StagingCommitter: Starting: Task committer attempt_20200509151301_0000_m_000000_0: needsTaskCommit() Task attempt_20200509151301_0000_m_000000_0
20/05/09 15:13:18 INFO StagingCommitter: Task committer attempt_20200509151301_0000_m_000000_0: needsTaskCommit() Task attempt_20200509151301_0000_m_000000_0: duration 0:00.005s
20/05/09 15:13:18 INFO StagingCommitter: Starting: Task committer attempt_20200509151301_0000_m_000000_0: commit task attempt_20200509151301_0000_m_000000_0
20/05/09 15:13:18 INFO StagingCommitter: Task committer attempt_20200509151301_0000_m_000000_0: commit task attempt_20200509151301_0000_m_000000_0: duration 0:00.019s
20/05/09 15:13:18 ERROR Utils: Aborting task
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Ljava/lang/String;)Lorg/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat;
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.getStat(NativeIO.java:460)
at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfoByNativeIO(RawLocalFileSystem.java:821)
at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:735)
at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:703)
at org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:52)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2091)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2071)
at org.apache.hadoop.fs.FileSystem$5.hasNext(FileSystem.java:2190)
at org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles(S3AUtils.java:1295)
at org.apache.hadoop.fs.s3a.S3AUtils.flatmapLocatedFiles(S3AUtils.java:1333)
at org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter(S3AUtils.java:1350)
at org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.getTaskOutput(StagingCommitter.java:385)
at org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.commitTask(StagingCommitter.java:641)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.commitTask(PathOutputCommitProtocol.scala:220)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/05/09 15:13:18 ERROR Utils: Aborting task
According to my current understanding, the configuration is correct. Probably, the error is caused by some version incompatibilities or my local environment settings.
Provided code works as expected for ORC and CSV without any error, but not for Parquet.
Please, suggest what could cause the error and how to resolve this?
For everyone who comes here, I found the solution. As expected, the problem is not related to S3A output committers or library dependencies.
The UnsatisfiedLinkError exception on Java native method raised because of version incompatibility between Hadoop version in SBT dependencies and winutils.exe (HDFS wrapper) on my Windows machine.
I've downloaded corresponding version from cdarlint/winutils and it all worked. LOL