apache-sparkhdfskerberoswebhdfs

Storing parquet to Kerberos secured Webhdfs from Spark


I'm writing to a webhdfs path, secured by Kerberos, from Spark. And part of it is actually working, but it breaks down when writing parquet files to the (web)hdfs location.

The authentication & authorization work and the script creates the path structure needed to store the partquet files to, but when the actual writing starts the OutputStreams start failing.

The spark config looks like this (I made it a bit more verbose):

  val hadoopConfig = new Configuration()
  hadoopConfig.set(FS_DEFAULT_NAME_KEY, "webhdfs://hadoop-host:14000/webhdfs/v1")
  hadoopConfig.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")
  hadoopConfig.set(HADOOP_SECURITY_AUTHORIZATION, "true")

  UserGroupInformation.setConfiguration(hadoopConfig)
  UserGroupInformation.loginUserFromKeytab("principal@REALM", "path/to/keytab.keytab")

  new SparkConf()
    .setIfMissing("spark.master", "local[*]")
    .set("spark.yarn.keytab", "path/to/keytab.keytab")
    .set("spark.yarn.principal", "principal@REALM")
    .set("spark.hadoop." + FS_DEFAULT_NAME_KEY, "webhdfs://hadoop-host:14000/webhdfs/v1")
    .set("spark.hadoop." + HADOOP_SECURITY_AUTHENTICATION, "kerberos")
    .set("spark.hadoop." + HADOOP_SECURITY_AUTHORIZATION, "true")

And when I use spark.write.parquet (or .text) it does create the path as mentioned (eg. using target /user/tom/dump/2018/06/11 it does create that directory path on hfds), but when the job gets to the point where is is going to store actual data it fails with this error.

...
11:43:56,668  INFO TaskSetManager:54 - Starting task 0.0 in stage 1.0 (TID 19, localhost, executor driver, partition 0, ANY, 7754 bytes)
11:43:56,668  INFO Executor:54 - Running task 0.0 in stage 1.0 (TID 19)
11:43:56,674  INFO deprecation:1129 - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
11:43:56,702  INFO ShuffleBlockFetcherIterator:54 - Getting 19 non-empty blocks out of 19 blocks
11:43:56,703  INFO ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 3 ms
11:43:56,713  INFO SQLHadoopMapReduceCommitProtocol:54 - Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
11:43:57,016 ERROR Utils:91 - Aborting task
java.io.IOException: Error writing request body to server
    at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
    at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
    at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
11:43:57,186  WARN FileOutputCommitter:467 - Could not delete webhdfs://hadoop-cm001.ix5.shared.prod.st.ecg.so:14000/user/mim_dev/cdata/user_profiling_dump/cdata/2018/06/11/_temporary/0/_temporary/attempt_20180611114356_0001_m_000000_0
11:43:57,187 ERROR FileFormatWriter:70 - Job job_20180611114356_0001 aborted.
11:43:57,188  WARN Utils:87 - Suppressing exception in catch: Unauthorized
org.apache.hadoop.security.AccessControlException: Unauthorized
    at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
    at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$200(WebHdfsFileSystem.java:91)
    at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner$1.close(WebHdfsFileSystem.java:787)
    at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:275)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
11:43:57,190 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 19)
org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error writing request body to server
    at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
    at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
    at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
    ... 8 more
    Suppressed: org.apache.hadoop.security.AccessControlException: Unauthorized
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$200(WebHdfsFileSystem.java:91)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner$1.close(WebHdfsFileSystem.java:787)
        at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:275)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
        ... 9 more
11:43:57,199  WARN TaskSetManager:66 - Lost task 0.0 in stage 1.0 (TID 19, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error writing request body to server
    at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3536)
    at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3519)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.spark.unsafe.types.UTF8String.writeTo(UTF8String.java:182)
    at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.write(TextFileFormat.scala:163)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:392)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
    ... 8 more
    Suppressed: org.apache.hadoop.security.AccessControlException: Unauthorized
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:334)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$200(WebHdfsFileSystem.java:91)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner$1.close(WebHdfsFileSystem.java:787)
        at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.close(TextFileFormat.scala:169)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:275)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1420)
        ... 9 more

11:43:57,200 ERROR TaskSetManager:70 - Task 0 in stage 1.0 failed 1 times; aborting job
11:43:57,201  INFO TaskSchedulerImpl:54 - Removed TaskSet 1.0, whose tasks have all completed, from pool default
11:43:57,203  INFO TaskSchedulerImpl:54 - Cancelling stage 1
...

Solution

  • So the tip from @Samson-Sharfrichter made me look at how Spark handles these connections internally , and I stumbled upon a (IMHO not very well documented) feature: SparkHadoopUtil

    After logging in using UserGroupInformation.loginUserFromKeytab and after setting the SparkConf I add the credentials using this utility like:

    val credentials = UserGroupInformation.getLoginUser.getCredentials
    SparkHadoopUtil.get.addCurrentUserCredentials(credentials)
    

    This actually makes sure each Job has access to the correct credentials.