scalaelasticsearchapache-sparketlapache-spark-sql

How to add a new Struct column to a DataFrame


I'm currently trying to extract a database from MongoDB and use Spark to ingest into ElasticSearch with geo_points.

The Mongo database has latitude and longitude values, but ElasticSearch requires them to be casted into the geo_point type.

Is there a way in Spark to copy the lat and lon columns to a new column that is an array or struct?

Any help is appreciated!


Solution

  • I assume you start with some kind of flat schema like this:

    root
     |-- lat: double (nullable = false)
     |-- long: double (nullable = false)
     |-- key: string (nullable = false)
    

    First lets create example data:

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.functions.{col, udf}
    import org.apache.spark.sql.types._
    
    val rdd = sc.parallelize(
        Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)
    
    val schema = StructType(
        StructField("lat", DoubleType, false) ::
        StructField("long", DoubleType, false) ::
        StructField("key", StringType, false) ::Nil)
    
    val df = sqlContext.createDataFrame(rdd, schema)
    

    An easy way is to use an udf and case class:

    case class Location(lat: Double, long: Double)
    val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))
    
    val dfRes = df.
       withColumn("location", makeLocation(col("lat"), col("long"))).
       drop("lat").
       drop("long")
    
    dfRes.printSchema
    

    and we get

    root
     |-- key: string (nullable = false)
     |-- location: struct (nullable = true)
     |    |-- lat: double (nullable = false)
     |    |-- long: double (nullable = false)
    

    A hard way is to transform your data and apply schema afterwards:

    val rddRes = df.
        map{case Row(lat, long, key) => Row(key, Row(lat, long))}
    
    val schemaRes = StructType(
        StructField("key", StringType, false) ::
        StructField("location", StructType(
            StructField("lat", DoubleType, false) ::
            StructField("long", DoubleType, false) :: Nil
        ), true) :: Nil 
    )
    
    sqlContext.createDataFrame(rddRes, schemaRes).show
    

    and we get an expected output

    +------+-------------+
    |   key|     location|
    +------+-------------+
    |Warsaw|[52.23,21.01]|
    | Corte|  [42.3,9.15]|
    +------+-------------+
    

    Creating nested schema from scratch can be tedious so if you can I would recommend the first approach. It can be easily extended if you need more sophisticated structure:

    case class Pin(location: Location)
    val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))
    
    df.
        withColumn("pin", makePin(col("lat"), col("long"))).
        drop("lat").
        drop("long").
        printSchema
    

    and we get expected output:

    root
     |-- key: string (nullable = false)
     |-- pin: struct (nullable = true)
     |    |-- location: struct (nullable = true)
     |    |    |-- lat: double (nullable = false)
     |    |    |-- long: double (nullable = false)
    

    Unfortunately you have no control over nullable field so if is important for your project you'll have to specify schema.

    Finally you can use struct function introduced in 1.4:

    import org.apache.spark.sql.functions.struct
    
    df.select($"key", struct($"lat", $"long").alias("location"))