I'm writing to a webhdfs
path, secured by Kerberos
, from Spark
.
And part of it is actually working, but it breaks down when writing parquet files to the (web)hdfs
location.
The authentication & authorization work and the script creates the path structure needed to store the partquet files to, but when the actual writing starts the OutputStreams start failing.
The spark config looks like this (I made it a bit more verbose):
val hadoopConfig = new Configuration()
hadoopConfig.set(FS_DEFAULT_NAME_KEY, "webhdfs://hadoop-host:14000/webhdfs/v1")
hadoopConfig.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")
hadoopConfig.set(HADOOP_SECURITY_AUTHORIZATION, "true")
UserGroupInformation.setConfiguration(hadoopConfig)
UserGroupInformation.loginUserFromKeytab("principal@REALM", "path/to/keytab.keytab")
new SparkConf()
.setIfMissing("spark.master", "local[*]")
.set("spark.yarn.keytab", "path/to/keytab.keytab")
.set("spark.yarn.principal", "principal@REALM")
.set("spark.hadoop." + FS_DEFAULT_NAME_KEY, "webhdfs://hadoop-host:14000/webhdfs/v1")
.set("spark.hadoop." + HADOOP_SECURITY_AUTHENTICATION, "kerberos")
.set("spark.hadoop." + HADOOP_SECURITY_AUTHORIZATION, "true")
And when I use spark.write.parquet (or .text) it does create the path as mentioned (eg. using target /user/tom/dump/2018/06/11
it does create that directory path on hfds), but when the job gets to the point where is is going to store actual data it fails with this error.
...
11:43:56,668 INFO TaskSetManager:54 - Starting task 0.0 in stage 1.0 (TID 19, localhost, executor driver, partition 0, ANY, 7754 bytes)
11:43:56,668 INFO Executor:54 - Running task 0.0 in stage 1.0 (TID 19)
11:43:56,674 INFO deprecation:1129 - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
11:43:56,702 INFO ShuffleBlockFetcherIterator:54 - Getting 19 non-empty blocks out of 19 blocks
11:43:56,703 INFO ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 3 ms
11:43:56,713 INFO SQLHadoopMapReduceCommitProtocol:54 - Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
11:43:57,016 ERROR Utils:91 - Aborting task
java.io.IOException: Error writing request body to server
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
11:43:57,186 WARN FileOutputCommitter:467 - Could not delete webhdfs://hadoop-cm001.ix5.shared.prod.st.ecg.so:14000/user/mim_dev/cdata/user_profiling_dump/cdata/2018/06/11/_temporary/0/_temporary/attempt_20180611114356_0001_m_000000_0
11:43:57,187 ERROR FileFormatWriter:70 - Job job_20180611114356_0001 aborted.
11:43:57,188 WARN Utils:87 - Suppressing exception in catch: Unauthorized
org.apache.hadoop.security.AccessControlException: Unauthorized
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$200(WebHdfsFileSystem.java:91)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner$1.close(WebHdfsFileSystem.java:787)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
11:43:57,190 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 19)
org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error writing request body to server
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
... 8 more
Suppressed: org.apache.hadoop.security.AccessControlException: Unauthorized
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$200(WebHdfsFileSystem.java:91)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner$1.close(WebHdfsFileSystem.java:787)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
... 9 more
11:43:57,199 WARN TaskSetManager:66 - Lost task 0.0 in stage 1.0 (TID 19, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error writing request body to server
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
... 8 more
Suppressed: org.apache.hadoop.security.AccessControlException: Unauthorized
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$200(WebHdfsFileSystem.java:91)
at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner$1.close(WebHdfsFileSystem.java:787)
at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
... 9 more
11:43:57,200 ERROR TaskSetManager:70 - Task 0 in stage 1.0 failed 1 times; aborting job
11:43:57,201 INFO TaskSchedulerImpl:54 - Removed TaskSet 1.0, whose tasks have all completed, from pool default
11:43:57,203 INFO TaskSchedulerImpl:54 - Cancelling stage 1
...
So the tip from @Samson-Sharfrichter made me look at how Spark handles these connections internally , and I stumbled upon a (IMHO not very well documented) feature: SparkHadoopUtil
After logging in using UserGroupInformation.loginUserFromKeytab
and after setting the SparkConf I add the credentials using this utility like:
val credentials = UserGroupInformation.getLoginUser.getCredentials
SparkHadoopUtil.get.addCurrentUserCredentials(credentials)
This actually makes sure each Job has access to the correct credentials.