Cluster configuration:
What I do: Read HBase data through Spark
When I use IntelliJ and local
mode everything works fine, but when I change mode to
spark-submit --master yarn
, the following stacktrace happens:
20/05/20 11:00:46 ERROR mapreduce.TableInputFormat: java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:221)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.initialize(TableInputFormat.java:200)
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:243)
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:254)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at com.song.HbaseOnSpark1$.main(HbaseOnSpark1.scala:32)
at com.song.HbaseOnSpark1.main(HbaseOnSpark1.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:219)
... 27 more
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hbase.client.ConnectionImplementation.close(ConnectionImplementation.java:1938)
at org.apache.hadoop.hbase.client.ConnectionImplementation.<init>(ConnectionImplementation.java:310)
... 32 more
20/05/20 11:00:46 ERROR yarn.ApplicationMaster: User class threw exception: java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:254)
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:254)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at com.song.HbaseOnSpark1$.main(HbaseOnSpark1.scala:32)
at com.song.HbaseOnSpark1.main(HbaseOnSpark1.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:558)
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:249)
... 24 more
This is my code:
val conf: SparkConf = new SparkConf().setAppName("spark1")
val spark = new SparkContext(conf)
val hbaseConf: Configuration = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
hbaseConf.set(TableInputFormat.INPUT_TABLE,"idx_name")
hbaseConf.set("hbase.defaults.for.version.skip", "true")
val rdd: RDD[(ImmutableBytesWritable, Result)] = spark.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
its hbase classpatth issue in your cluster but you need to add hbase jars to your classpath like this
export SPARK_CLASSPATH=$SPARK_CLASSPATH:`hbase classpath`
hbase classpath
will give all the jars for hbase connections and etc....
Since all the jars required are there in ide lib
If you are using maven do a mvn depdency:tree
to understand what jars are needed in the cluster. based on that you can adjust your spark-submit
script.
if you are using --jars
option see that all jars passed correctly or uber jar has correct dependencies when packing jar..
Further reading Spark spark-submit --jars arguments wants comma list, how to declare a directory of jars?