How can i read/write data from/to hive? Is it necessary to compile spark with hive profile to interact with hive? which maven dependencies are required to interact with hive?
i could not find a well documentation to follow step by step to get working with hive.
Currently Here is my code
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val sqlCon = new SQLContext(sc)
val schemaString = "Date:string,Open:double,High:double,Low:double,Close:double,Volume:double,Adj_Close:double"
val schema =
StructType(
schemaString.split(",").map(fieldName => StructField(fieldName.split(":")(0),
getFieldTypeInSchema(fieldName.split(":")(1)), true)))
val rdd = sc.textFile("hdfs://45.55.159.119:9000/yahoo_stocks.csv")
//val rdd = sc.parallelize(arr)
val rowRDDx = noHeader.map(p => {
var list: collection.mutable.Seq[Any] = collection.mutable.Seq.empty[Any]
var index = 0
val regex = rowSplittingRegexBuilder(Seq(","))
var tokens = p.split(regex)
tokens.foreach(value => {
var valType = schema.fields(index).dataType
var returnVal: Any = null
valType match {
case IntegerType => returnVal = value.toString.toInt
case DoubleType => returnVal = value.toString.toDouble
case LongType => returnVal = value.toString.toLong
case FloatType => returnVal = value.toString.toFloat
case ByteType => returnVal = value.toString.toByte
case StringType => returnVal = value.toString
case TimestampType => returnVal = value.toString
}
list = list :+ returnVal
index += 1
})
Row.fromSeq(list)
})
val df = sqlCon.applySchema(rowRDDx, schema)
HiveContext.sql("create table yahoo_orc_table (date STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")
df.saveAsTable("hive", "org.apache.spark.sql.hive.orc", SaveMode.Append)
I am getting following exception
15/10/12 14:57:36 INFO storage.BlockManagerMaster: Registered BlockManager
15/10/12 14:57:38 INFO scheduler.EventLoggingListener: Logging events to hdfs://host:9000/spark/logs/local-1444676256555
Exception in thread "main" java.lang.VerifyError: Bad return type
Exception Details:
Location:
org/apache/spark/sql/catalyst/expressions/Pmod.inputType()Lorg/apache/spark/sql/types/AbstractDataType; @3: areturn
Reason:
Type 'org/apache/spark/sql/types/NumericType$' (current frame, stack[0]) is not assignable to 'org/apache/spark/sql/types/AbstractDataType' (from method signature)
Current Frame:
bci: @3
flags: { }
locals: { 'org/apache/spark/sql/catalyst/expressions/Pmod' }
stack: { 'org/apache/spark/sql/types/NumericType$' }
Bytecode:
0000000: b200 63b0
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
at java.lang.Class.getConstructor0(Class.java:2895)
at java.lang.Class.getDeclaredConstructor(Class.java:2066)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$4.apply(FunctionRegistry.scala:267)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$4.apply(FunctionRegistry.scala:267)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.expression(FunctionRegistry.scala:267)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.<init>(FunctionRegistry.scala:148)
at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.<clinit>(FunctionRegistry.scala)
at org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:414)
at org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:413)
at org.apache.spark.sql.UDFRegistration.<init>(UDFRegistration.scala:39)
at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:203)
at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:72)
thanks
As dr_stein mentioned, this error is usually due to incompatible compile time and runtime JDK versions, such as running JDK 1.6 with 1.7 compiled jars.
I would also check if your hive libraries reflect the correct versions and if your hive server is also running on the same JDK as you are.
you can also try running with the -noverify option, which will disable verification.