scalaapache-sparkgenericsserializationapache-spark-encoders

org.apache.spark.SparkRuntimeException: Only expression encoders are supported for now


I'm working with generics and encoders with Spark Datasets. And facing the above error with code that looks like. Please ignore the semantics of the code, Just posting a replicated simplified usecase.

The spark version I'm using is 3.2.1. And scala version 2.12.

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}

abstract class SparkRunner[D](
                               deserializeFunc: Array[Byte] => Option[D])
                             (implicit spark: SparkSession, mapper: ObjectMapper)
  extends Serializable {
  import spark.implicits._

  val linesOfBytes: Dataset[Array[Byte]] = spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
    .as[String].map(mapper.writeValueAsBytes)


  implicit val encD:Encoder[D]
  implicit val encOD:Encoder[Option[D]]


  val transformedDf =
    linesOfBytes
      .map(deserializeFunc)

  def start(): StreamingQuery = {
    transformedDf.writeStream
      //todo: Adopt config builder pattern for sink and src properties eventually.
      .format("console")
      .queryName("Query1")
      .trigger(Trigger.ProcessingTime(0))
      .start()
  }

}

object SparkMain {

  def main(args: Array[String]): Unit = {

    implicit val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("A")
      .getOrCreate()

    implicit val objMapper = new ObjectMapper()

    case class Payload(payload: ObjectNode)


    def deserialize(bytes: Array[Byte]): Option[String] = {
      objMapper.registerModule(DefaultScalaModule)
      val o = objMapper.readValue(bytes, classOf[Payload])

        Some(objMapper.writeValueAsString(o.payload))
    }

    try new SparkRunner(
      deserialize) {
//      override implicit val encD: Encoder[String] = org.apache.spark.sql.Encoders.kryo[String]
//      override implicit val encOD: Encoder[Option[String]] = org.apache.spark.sql.Encoders.kryo[Option[String]]
      override implicit val encD: Encoder[String] = ExpressionEncoder()
      override implicit val encOD: Encoder[Option[String]] = ExpressionEncoder()
    }.start().awaitTermination()
    finally {
      spark.close()
    }
  }
}

//SparkMain.main(Array("")) // Uncomment if you're running this in spark-shell

The whole of the above works if we replace generics with concrete types.


Solution

  •   lazy val transformedDf =
        linesOfBytes
          .map(deserializeFunc)
    

    lazy (or def) is needed because your object isn't constructed yet, if you debug and step into the map call you'll see a null value for the implicit Encoder evidence field. As the Spark code is lazy you may as well use def.

    In your code example you already pass in implicits, why not also pass those two (or four from the original SO post) into SparkRunner? They seem a perfect case for a type class.

    That said you can also make the implicits both def in base class and impl and achieve the same result.