mavenapache-sparkspark-hive

apache Spark with hive


How can i read/write data from/to hive? Is it necessary to compile spark with hive profile to interact with hive? which maven dependencies are required to interact with hive?

i could not find a well documentation to follow step by step to get working with hive.

Currently Here is my code

val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
    val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    val sqlCon = new SQLContext(sc)
val schemaString = "Date:string,Open:double,High:double,Low:double,Close:double,Volume:double,Adj_Close:double"
val schema =
  StructType(
    schemaString.split(",").map(fieldName => StructField(fieldName.split(":")(0),
      getFieldTypeInSchema(fieldName.split(":")(1)), true)))
val rdd = sc.textFile("hdfs://45.55.159.119:9000/yahoo_stocks.csv")
//val rdd = sc.parallelize(arr)
val rowRDDx = noHeader.map(p => {
  var list: collection.mutable.Seq[Any] = collection.mutable.Seq.empty[Any]
  var index = 0
  val regex = rowSplittingRegexBuilder(Seq(","))
  var tokens = p.split(regex)
  tokens.foreach(value => {
    var valType = schema.fields(index).dataType
    var returnVal: Any = null
    valType match {
      case IntegerType => returnVal = value.toString.toInt
      case DoubleType => returnVal = value.toString.toDouble
      case LongType => returnVal = value.toString.toLong
      case FloatType => returnVal = value.toString.toFloat
      case ByteType => returnVal = value.toString.toByte
      case StringType => returnVal = value.toString
      case TimestampType => returnVal = value.toString
    }
    list = list :+ returnVal
    index += 1
  })
  Row.fromSeq(list)
})
val df = sqlCon.applySchema(rowRDDx, schema)
HiveContext.sql("create table yahoo_orc_table (date STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")
df.saveAsTable("hive", "org.apache.spark.sql.hive.orc", SaveMode.Append)

I am getting following exception

15/10/12 14:57:36 INFO storage.BlockManagerMaster: Registered BlockManager 
15/10/12 14:57:38 INFO scheduler.EventLoggingListener: Logging events to hdfs://host:9000/spark/logs/local-1444676256555
Exception in thread "main" java.lang.VerifyError: Bad return type 
Exception Details: 
  Location: 
    org/apache/spark/sql/catalyst/expressions/Pmod.inputType()Lorg/apache/spark/sql/types/AbstractDataType; @3: areturn 
  Reason: 
    Type 'org/apache/spark/sql/types/NumericType$' (current frame, stack[0]) is not assignable to 'org/apache/spark/sql/types/AbstractDataType' (from method signature) 
  Current Frame: 
    bci: @3 
    flags: { } 
    locals: { 'org/apache/spark/sql/catalyst/expressions/Pmod' } 
    stack: { 'org/apache/spark/sql/types/NumericType$' } 
  Bytecode: 
    0000000: b200 63b0 

        at java.lang.Class.getDeclaredConstructors0(Native Method) 
        at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595) 
        at java.lang.Class.getConstructor0(Class.java:2895) 
        at java.lang.Class.getDeclaredConstructor(Class.java:2066) 
        at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$4.apply(FunctionRegistry.scala:267) 
        at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$4.apply(FunctionRegistry.scala:267) 
        at scala.util.Try$.apply(Try.scala:161) 
        at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.expression(FunctionRegistry.scala:267) 
        at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.<init>(FunctionRegistry.scala:148) 
        at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.<clinit>(FunctionRegistry.scala) 
        at org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:414) 
        at org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:413) 
        at org.apache.spark.sql.UDFRegistration.<init>(UDFRegistration.scala:39) 
        at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:203) 
        at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:72)

thanks


Solution

  • As dr_stein mentioned, this error is usually due to incompatible compile time and runtime JDK versions, such as running JDK 1.6 with 1.7 compiled jars.

    I would also check if your hive libraries reflect the correct versions and if your hive server is also running on the same JDK as you are.

    you can also try running with the -noverify option, which will disable verification.