I am using scalding to do a simple word count type of things. I get an error when using partial function to expand on the tuple. The exact error message is:
Error:(15, 14) missing parameter type for expanded function
The argument types of an anonymous function must be fully known. (SLS 8.5)
Expected type was: (?, ?) => ?
.reduce{ case ((t1, v1), (t2, v2)) =>
^
Adding types to the tuple does not help as well. What am I doing wrong?
TypedPipe.from(PackedAvroSource[TrackPlay](args("input")))
.map({ t => (t.getTrackId.toString, 1) })
.groupBy(_._1)
.reduce{ case ((t1, v1), (t2, v2)) =>
(t1, v1 + v2)
}
.values
.write(TypedTsv[(CharSequence, Int)](args("output")))
You need to specify the type as the compiler cannot infer the correct type:
.reduce[(String, Int)] { case ((t1, v1), (t2, v2)) => (t1, v1 + v2) }
The type is specified as [U >: Tuple2[String, Int]]
so the compiler infers Tuple2[Any, Any]
and is therefore not able to compute the expected result type Tuple2[String, Int]
.
More on this in section 8.5: http://www.scala-lang.org/files/archive/nightly/pdfs/ScalaReference.pdf
As a side note, a simpler implementation for your overall logic is:
TypedPipe.from(PackedAvroSource[TrackPlay](args("input")))
.map(t => (t.getTrackId.toString, 1))
.group
.sum
.toTypedPipe
.write(TypedTsv[(CharSequence, Int)](args("output")))