apache-sparkhadoophbasecloudera-cdh

Unable to read Hbase data with spark in yarn cluster mode


Cluster configuration:

What I do: Read HBase data through Spark

When I use IntelliJ and local mode everything works fine, but when I change mode to spark-submit --master yarn, the following stacktrace happens:

20/05/20 11:00:46 ERROR mapreduce.TableInputFormat: java.io.IOException: java.lang.reflect.InvocationTargetException
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:221)
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormat.initialize(TableInputFormat.java:200)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:243)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:254)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at com.song.HbaseOnSpark1$.main(HbaseOnSpark1.scala:32)
    at com.song.HbaseOnSpark1.main(HbaseOnSpark1.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:219)
    ... 27 more
Caused by: java.lang.NullPointerException
    at org.apache.hadoop.hbase.client.ConnectionImplementation.close(ConnectionImplementation.java:1938)
    at org.apache.hadoop.hbase.client.ConnectionImplementation.<init>(ConnectionImplementation.java:310)
    ... 32 more

20/05/20 11:00:46 ERROR yarn.ApplicationMaster: User class threw exception: java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:254)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:254)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at com.song.HbaseOnSpark1$.main(HbaseOnSpark1.scala:32)
    at com.song.HbaseOnSpark1.main(HbaseOnSpark1.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:558)
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:249)
    ... 24 more

This is my code:

 val conf: SparkConf = new SparkConf().setAppName("spark1")
    val spark = new SparkContext(conf)

    val hbaseConf: Configuration = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
    hbaseConf.set(TableInputFormat.INPUT_TABLE,"idx_name")
    hbaseConf.set("hbase.defaults.for.version.skip", "true")


    val rdd: RDD[(ImmutableBytesWritable, Result)] = spark.newAPIHadoopRDD(
      hbaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

Solution

  • its hbase classpatth issue in your cluster but you need to add hbase jars to your classpath like this

     export SPARK_CLASSPATH=$SPARK_CLASSPATH:`hbase classpath`
    

    hbase classpath will give all the jars for hbase connections and etc....

    Why its working in local mode ?

    Since all the jars required are there in ide lib


    If you are using maven do a mvn depdency:tree to understand what jars are needed in the cluster. based on that you can adjust your spark-submit script.

    if you are using --jars option see that all jars passed correctly or uber jar has correct dependencies when packing jar..

    There might be jar conflict also check that carefully with local mode environment since thats working fine.

    Further reading Spark spark-submit --jars arguments wants comma list, how to declare a directory of jars?