apache-sparkspark-streaminghadoop2nfshadoop-plugins

Using Spark Job for putting file/data into HDFS from shared/NFS mounted location giving intermittent issues


I have a three node Spark Cluster using Yarn as cluster manager [running on three node hadoop cluster].

Consider that, My Hadoop Cluster is having three nodes [Master, Slave1 and Slave2] Resourcemanager is running on Master and NodaManager on Slave1 & Slave2. Spark Cluster is also present on three nodes.

At Master node, I have created a folder /data/nfsshare which I have mounted on Slave1 and Slave2 as /nfsshare. Now I have kept one file abc.txt in /data/nfsshare folder which is visible to both slave1 and slave2 at /nfsshare location.

I have created a small spark job for copying abc.txt from /data/nfsshare location to HDFS and also perform word count and save its result as well in HDFS.

def write2HDFS(args:Array[String]){

val  source = args(0)
val  destination = args(1)   
val processedDataDestination = args(2)
val conf = new SparkConf().setAppName("WCRemoteReadHDFSWrite").set("spark.hadoop.validateOutputSpecs", "true");
val sc = new SparkContext(conf)

logger.info("STARTING READ")

val rdd = sc.textFile(source)

logger.info("READING DONE")
logger.info("STARTING WRITE")
logger.info("rdd.toDebugString >>>>>> "+rdd.toDebugString)
logger.info("rdd.getNumPartitions >>>>>>>>" +rdd.getNumPartitions)
// rdd.coalesce(1)
// logger.info("rdd.getNumPartitions after coalesce(1) >>>>>>>>" +rdd.getNumPartitions)

rdd.saveAsTextFile(destination)


logger.info("DONE")
rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_).saveAsTextFile(processedDataDestination) 

sc.stop}

When I am trying execute this code using command:

./bin/spark-submit --class <MyClassName> --master yarn --deploy-mode cluster --verbose --executor-cores 9 --num-executors 3 --executor-memory 3G --driver-memory 1g /data/<JarName>.jar file:///data/nfsshare/abc.txt hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWrite1MB hdfs://<HDFS HOST IP>:8020/user/hduser/hdfsWriteP1MB

I am getting following intermittent issues:

1) InputPath Doesnt exist: file:/data/nfsshare/abc.txt, was coming intermittently during some runs of this job [whereas file was present at shared location/mounted path]

2) Sometimes/intermittently job status was coming as failed but output directories were getting created with the required data

3) Output Directory already exists: Sometimes HDFS output directory existing issue was coming --> This got resolved by increasing executor and driver memory

-->I tried running this job in both clustered and client deployment modes but I am getting same issue in both the cases.

I am not sure if shared location path as /data/nfsshare at Master and /nfsshare at slaves is making any difference? because at command line I am passing /data/nfsshare as file path location and hence whenever any executor ran on slaves would have looked for /data/nfsshare would have failed.

I tried running this job on all three nodes but these intermittent issues still persists.

Any expert advice would be appreciated.

If at all there is any other much better way to putting file from any staging area/mounted location to HDFS is possible then please share that as well.

Regards, Bhupesh


Solution

  • Actually I was facing InputPath Doesnt exist: file:/data/nfsshare/abc.txt intermittently because of the mounted folder names. Once I kept same name on all nodes [/data/nfsshare]. this issue went off.

    I am assuming that when I was running my spark job in Cluster mode, YARN was deciding where to run driver and executors, hence if at all executors were running on Master node [from where /data/nfsshare] was visible, job was working fine whereas for other executors where this path coming as /nfsshare, this throwing path related issue. Once path issue was resolved all executors were able to see file path as /data/nfsshare

    Also for output directory already exist, Chitral Verma's code snippet helped.