scalaapache-sparkdataframeparquetwrite-error

Spark: LeaseExpiredException while writing large dataframe to parquet files


I have a large dataframe which I am writing to parquet files in HDFS. Getting the below exception from logs :

2018-10-15 18:31:32 ERROR Executor:91 - Exception in task 41.0 in stage 0.0 (TID 1321)
org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:369)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /home/prod_out/20181007/_temporary/0/_temporary/attempt_20181015183108_0000_m_000041_0/part-00041-1185b10b-bcb1-4b7e-b732-dd6f71322b7d-c000.snappy.parquet (inode 33628528083): File does not exist. Holder DFSClient_NONMAPREDUCE_179567941_77 does not have any open files.
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3481)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3284)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3122)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3082)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:822)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:500)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200)

    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1455)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1251)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:448)
2018-10-15 18:32:06 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 2189

Googled about it but couldn't find any concrete solution. Set the speculation false: conf.set("spark.speculation","false")
But still didn't help. It's finishing few tasks, generating few part files and then abruptly stops with this error.

Details: Spark version : 2.3.1 (This was not happening in 1.6x).
There is only one session running, which rules out the possibility of the same location being accessed by a different session.

Any pointers?

Thanks!


Solution

  • Actually the issue is because before spark writes the data into specified hdfs location, it uploads the data into temporary location.This two stage mechanism is the used to ensure consistency of the final data set when working with file systems. In case of successful write the data is moved from temporary location. And in case of unsuccessful write the data is removed from the temporary location. In your case there might be a different executor thread making changes to the temporary location. And once the original executor thread looks to the temporary location, it is not available and hdfs lease exception is thrown. In order to avoid this exception,

    1. Make sure you are not using any parallel collections.
    2. Avoid multi-threading if applicable
    3. spark.conf.set("spark.speculation","false")