I am trying to implement a Scala Spark Aggregator with a Map containing non-primitive types (for example, Map[String, Set[String]]) as its buffer. It seems like I can use kryo or ExpressionEncoder to encode a collection of primitives (for example, Set[String]) but when I embed it within a Map it can't seem to find the encoder.
How do I create an encoder for such nested types?
I have tried things like the following:
def bufferEncoder: Encoder[Map[String, Set[String]]] = Encoders.kryo[Map[String, Set[String]]]
and
def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])
For another Aggregator I wrote, I used
def bufferEncoder: Encoder[Set[String]] = Encoders.kryo[Set[String]]
which worked.
But when I try the first two options I get this error:
java.lang.UnsupportedOperationException: No Encoder found for java.util.Map[String,Array[String]]
UPDATE:
I am adding two code samples, as simple as I can make them. The only difference is that the first uses an Array within the Map, and the second uses a Set. The first compiles and runs (the results aren't important) but the second gets the exception I have described above. Please note that in both examples, I am using a mutable Scala Map and Set.
class MapSetTest extends Aggregator[String, Map[String, Set[String]], Int] with Serializable {
override def zero = Map[String, Set[String]]()
override def reduce(buffer: Map[String, Set[String]], newItem: String) = {
buffer.put(newItem, Set[String]() + newItem)
buffer
}
override def merge(b1: Map[String, Set[String]], b2: Map[String, Set[String]]) = {
b1
}
override def finish(reduction: Map[String, Set[String]]): Int = {
reduction.size
}
def bufferEncoder = implicitly[Encoder[Map[String, Set[String]]]]
def outputEncoder = Encoders.scalaInt
}
vs.
class MapArrayTest extends Aggregator[String, Map[String, Array[String]], Int] with Serializable {
override def zero = Map[String, Array[String]]()
override def reduce(buffer: Map[String, Array[String]], newItem: String) = {
buffer.put(newItem, Array[String](newItem))
buffer
}
override def merge(b1: Map[String, Array[String]], b2: Map[String, Array[String]]) = {
b1
}
override def finish(reduction: Map[String, Array[String]]): Int = {
reduction.size
}
def bufferEncoder = implicitly[Encoder[Map[String, Array[String]]]]
def outputEncoder = Encoders.scalaInt
} }
Whilst per my comment I wouldn't recommend kyro this code:
def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])
is not going to work on it's own.
Unfortunately point 2. isn't directly called out in the getting started page.
import sparkSession.implicits._
val enc = implicitly[Encoder[Map[String,Set[String]]]]
will work, this however, as you've found out, won't
import sparkSession.implicits._
val enc = implicitly[ExpressionEncoder[Map[String,Set[String]]]]