scalaapache-sparkapache-spark-sqlapache-spark-encoders

How to create an Encoder for Scala collection (to implement custom Aggregator)?


Spark 2.3.0 with Scala 2.11. I'm implementing a custom Aggregator according to the docs here. The aggregator requires 3 types for input, buffer, and output.

My aggregator has to act upon all previous rows in the window so I declared it like this:

case class Foo(...)

object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
    // other override methods
    override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}

One of the override methods is supposed to return the encoder for the buffer type, which in this case is a ListBuffer. I can't find any suitable encoder for org.apache.spark.sql.Encoders nor any other way to encode this so I don't know what to return here.

I thought of creating a new case class which has a single property of type ListBuffer[Foo] and using that as my buffer class, and then using Encoders.product on that, but I am not sure if that is necessary or if there is something else I am missing. Thanks for any tips.


Solution

  • You should just let Spark SQL do its work and find the proper encoder using ExpressionEncoder as follows:

    scala> spark.version
    res0: String = 2.3.0
    
    case class Mod(id: Long)
    
    import org.apache.spark.sql.Encoder
    import scala.collection.mutable.ListBuffer
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    
    scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
    enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]