scalaapache-sparktypesscala-generics

Spark reduceByKey with generic types (Scala)


I am trying to create some simple custom aggregate operators in Spark using Scala.

I have created a simple hierarchy of operators, with the following super-class:

sealed abstract class Aggregator(val name: String) {
  type Key = Row  // org.apache.spark.sql.Row
  type Value

  ...
}

I also have a companion object, which constructs the appropriate aggregator each time. Observe that each operator is allowed to specify the Value type it wants.

Now the problem is when I try to call combineByKey:

val agg = Aggregator("SUM")
val res = rdd
    .map(agg.mapper)
    .reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))

The error is:

value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]

For my needs, Value can either be a numeric type or a tuple, hence its no bounds definition. If I replace the Value type declaration with:

type Value = Double

in Aggregator class, then everything works fine. Therefore, I suppose that the error is relevant to reduceByKey not knowing the exact Value type in compile time.

Any ideas on how to get around this?


Solution

  • Your RDD cannot be implicitly converted into PairRDDFunctions, because all the implicit ClassTags for keys and values are missing.

    You might want to include the class tags as implicit parameters in your Aggregator:

    sealed abstract class Aggregator[K: ClassTag, V: ClassTag](name: String) {
      implicit val keyClassTag: ClassTag[K] = implicitly
      implicit val valueClassTag: ClassTag[V] = implicitly
    }
    

    or maybe:

    sealed abstract class Aggregator[K, V](name: String)(implicit kt: ClassTag[K], vt: ClassTag[V]) {
      implicit val keyClassTag: ClassTag[K] = kt
      implicit val valueClassTag: ClassTag[V] = vt
    }
    

    or maybe even:

    sealed abstract class Aggregator(name: String) {
      type K
      type V
      implicit def keyClassTag: ClassTag[K]
      implicit def valueClassTag: ClassTag[V]
    }
    

    The last variant would shift the responsibility for providing the ClassTags to the implementor of the abstract class.

    Now, when using an aggregator a of type Aggregator[K, V] in a reduceByKey, you would have to make sure that those implicitly provided class tags are in the current implicit scope:

    val agg = Aggregator("SUM")
    import agg._ // now the implicits should be visible
    val res = rdd
    .map(agg.mapper)
    .reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))