scalaapache-sparkapache-spark-encoders

Apache Spark 2.1 : java.lang.UnsupportedOperationException: No Encoder found for scala.collection.immutable.Set[String]


I am using Spark 2.1.1 with Scala 2.11.6. I am getting the following error. I am not using any case classes.

java.lang.UnsupportedOperationException: No Encoder found for scala.collection.immutable.Set[String]
 field (class: "scala.collection.immutable.Set", name: "_2")
 field (class: "scala.Tuple2", name: "_2")
 root class: "scala.Tuple2"

The following portion of code is where the stacktrace points at.

val tweetArrayRDD = nameDF.select("namedEnts", "text", "storylines")
    .flatMap {
    case Row(namedEnts: Traversable[(String, String)], text: String, storylines: Traversable[String]) =>
      Option(namedEnts) match {
        case Some(x: Traversable[(String, String)]) =>
          //println("In flatMap:" + x + " ~~&~~ " + text + " ~~&~~ " + storylines)
          namedEnts.map((_, (text, storylines.toSet)))
        case _ => //println("In flatMap: blahhhh")
          Traversable()
      }
    case _ => //println("In flatMap: fooooo")
      Traversable()
  }
  .rdd.aggregateByKey((Set[String](), Set[String]()))((a, b) => (a._1 + b._1, a._2 ++ b._2), (a, b) => (a._1 ++ b._1, a._2 ++ b._2))
  .map { (s: ((String, String), (Set[String], Set[String]))) => {
    //println("In map: " + s)
    (s._1, (s._2._1.toSeq, s._2._2.toSeq))
  }}

Solution

  • The problem here is that Spark does not provide an encoder for Set out-of-the-box (it does provide encoders for "primitives", Seqs, Arrays, and Products of other supported types).

    You can either try using this excellent answer to create your own encoder for Set[String] (more accurately, an encoder for the type you're using, Traversable[((String, String), (String, Set[String]))], which contains a Set[String]), OR you can work-around this issue by using a Seq instead of a Set:

    // ...
    case Some(x: Traversable[(String, String)]) =>
      //println("In flatMap:" + x + " ~~&~~ " + text + " ~~&~~ " + storylines)
      namedEnts.map((_, (text, storylines.toSeq.distinct)))
    // ...
    

    (I'm using distinct to immitate the Set behavior; Can also try .toSet.toSeq)

    UPDATE: per your comment re Spark 1.6.2 - the difference is that in 1.6.2, Dataset.flatMap returns an RDD and not a Dataset, therefore requires no encoding of the results returned from the function you supply; So, this indeed brings up another good workaround - you can easily simulate this behavior by explicitly switching to work with the RDD before the flatMap operation:

    nameDF.select("namedEnts", "text", "storylines")
      .rdd
      .flatMap { /*...*/ } // use your function as-is, it can return Set[String]
      .aggregateByKey( /*...*/ )
      .map( /*...*/ )