scalaapache-sparkapache-spark-sqlapache-spark-datasetapache-spark-encoders

Encoder error while trying to map dataframe row to updated row


When I m trying to do the same thing in my code as mentioned below

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

I have taken the above reference from here: Scala: How can I replace value in Dataframs using scala But I am getting encoder error as

Unable to find encoder for type stored in a Dataset. Primitive types (Int, S tring, etc) and Product types (case classes) are supported by importing spark.im plicits._ Support for serializing other types will be added in future releases.

Note: I am using spark 2.0!


Solution

  • There is nothing unexpected here. You're trying to use code which has been written with Spark 1.x and is no longer supported in Spark 2.0:

    To be honest it didn't make much sense in 1.x either. Independent of version you can simply use DataFrame API:

    import org.apache.spark.sql.functions.{when, lower}
    
    val df = Seq(
      (2012, "Tesla", "S"), (1997, "Ford", "E350"),
      (2015, "Chevy", "Volt")
    ).toDF("year", "make", "model")
    
    df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
    

    If you really want to use map you should use statically typed Dataset:

    import spark.implicits._
    
    case class Record(year: Int, make: String, model: String)
    
    df.as[Record].map {
      case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
      case rec => rec
    }
    

    or at least return an object which will have implicit encoder:

    df.map {
      case Row(year: Int, make: String, model: String) => 
        (year, if(make.toLowerCase == "tesla") "S" else make, model)
    }
    

    Finally if for some completely crazy reason you really want to map over Dataset[Row] you have to provide required encoder:

    import org.apache.spark.sql.catalyst.encoders.RowEncoder
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    // Yup, it would be possible to reuse df.schema here
    val schema = StructType(Seq(
      StructField("year", IntegerType),
      StructField("make", StringType),
      StructField("model", StringType)
    ))
    
    val encoder = RowEncoder(schema)
    
    df.map {
      case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
        Row(year, "S", model)
      case row => row
    } (encoder)