scalaapache-sparkapache-spark-datasetapache-spark-encoders

Why doesn't dataset's foreach method require an encoder, but map does?


I have two datasets: Dataset[User] and Dataset[Book] where both User and Book are case classes. I join them like this:

val joinDS = ds1.join(ds2, "userid")

If I try to map over each element in joinDS, the compiler complains that an encoder is missing:

not enough arguments for method map: (implicit evidence$46: org.apache.spark.sql.Encoder[Unit])org.apache.spark.sql.Dataset[Unit]. Unspecified value parameter evidence$46. Unable to find encoder for type stored in a Dataset.

But the same error does not occur if I use foreach instead of map. Why doesn't foreach require an encoder as well? I have imported all implicits from the spark session already, so why does map require an encoder at all, when the dataset is a result of joining two datasets containing case classes)? Also, what type of dataset do I get from that join? Is it a Dataset[Row], or something else?


Solution

  • TL;DR Encoder is required to transform the outcome to the internal Spark SQL format and there is no need for that in case of foreach (or any other sink).

    Just take a look at the signatures. map is

    def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 
    

    so in plain words it transforms records from T to U and then uses the Encoder of U to transform the result to internal representation.

    foreach from the other hand, is

    def foreach(f: (T) ⇒ Unit): Unit 
    

    In other words it doesn't expect any result. Since there is no result to be stored, Encoder is just obsolete.