scalaapache-sparkelasticsearchelasticsearch-spark

How to convert types when reading data from Elasticsearch using elasticsearch-spark in SPARK


When i try to read data from elasticsearch using the esRDD("index") function in elasticsearch-spark, i get the results in type org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])]. And when i check the values, they are all type AnyRef. However, i saw in on ES site, it says :

elasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch types (and back)

My dependencies are:

scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"  
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0"  
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.1.0"  
libraryDependencies += "org.elasticsearch" % "elasticsearch-spark-20_2.11" % "5.4.0"

Do I miss something? And how can i convert the types in a convenient way?


Solution

  • OK, I found a solution. If you use esRDD, all types information are lost.
    It is better if we use:

    val df = sparkSession.read.format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include", "").load("index")
    

    You can config es in option, if you have done it before, option can be ignored.

    The data returend is in DataFrame, and the data types are preserved (converted to sql.DataTypes) in schema as long as the conversion is supported by elasticsearch-spark.

    And now you can do whatever you want.