I am trying to create some simple custom aggregate operators in Spark using Scala.
I have created a simple hierarchy of operators, with the following super-class:
sealed abstract class Aggregator(val name: String) {
type Key = Row // org.apache.spark.sql.Row
type Value
...
}
I also have a companion object, which constructs the appropriate aggregator each time. Observe that each operator is allowed to specify the Value type it wants.
Now the problem is when I try to call combineByKey
:
val agg = Aggregator("SUM")
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
The error is:
value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]
For my needs, Value
can either be a numeric type or a tuple, hence its no bounds definition. If I replace the Value
type declaration with:
type Value = Double
in Aggregator
class, then everything works fine. Therefore, I suppose that the error is relevant to reduceByKey
not knowing the exact Value
type in compile time.
Any ideas on how to get around this?
Your RDD
cannot be implicitly converted into PairRDDFunctions
, because all the implicit ClassTag
s for keys and values are missing.
You might want to include the class tags as implicit parameters in your Aggregator
:
sealed abstract class Aggregator[K: ClassTag, V: ClassTag](name: String) {
implicit val keyClassTag: ClassTag[K] = implicitly
implicit val valueClassTag: ClassTag[V] = implicitly
}
or maybe:
sealed abstract class Aggregator[K, V](name: String)(implicit kt: ClassTag[K], vt: ClassTag[V]) {
implicit val keyClassTag: ClassTag[K] = kt
implicit val valueClassTag: ClassTag[V] = vt
}
or maybe even:
sealed abstract class Aggregator(name: String) {
type K
type V
implicit def keyClassTag: ClassTag[K]
implicit def valueClassTag: ClassTag[V]
}
The last variant would shift the responsibility for providing the ClassTag
s to the implementor of the abstract class.
Now, when using an aggregator a
of type Aggregator[K, V]
in a reduceByKey
, you would have to make sure that those implicitly provided class tags are in the current implicit scope:
val agg = Aggregator("SUM")
import agg._ // now the implicits should be visible
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))