scalatypesreducescalding

Scalding: Cannot reduce with partial function


I am using scalding to do a simple word count type of things. I get an error when using partial function to expand on the tuple. The exact error message is:

Error:(15, 14) missing parameter type for expanded function
The argument types of an anonymous function must be fully known. (SLS 8.5)
Expected type was: (?, ?) => ?
      .reduce{ case ((t1, v1), (t2, v2)) =>
             ^

Adding types to the tuple does not help as well. What am I doing wrong?

TypedPipe.from(PackedAvroSource[TrackPlay](args("input")))
  .map({ t => (t.getTrackId.toString, 1) })
  .groupBy(_._1)
  .reduce{ case ((t1, v1), (t2, v2)) =>
      (t1, v1 + v2)
  }
  .values
  .write(TypedTsv[(CharSequence, Int)](args("output")))

Solution

  • You need to specify the type as the compiler cannot infer the correct type:

    .reduce[(String, Int)] { case ((t1, v1), (t2, v2)) => (t1, v1 + v2) }
    

    The type is specified as [U >: Tuple2[String, Int]] so the compiler infers Tuple2[Any, Any] and is therefore not able to compute the expected result type Tuple2[String, Int].

    More on this in section 8.5: http://www.scala-lang.org/files/archive/nightly/pdfs/ScalaReference.pdf


    As a side note, a simpler implementation for your overall logic is:

    TypedPipe.from(PackedAvroSource[TrackPlay](args("input")))
      .map(t => (t.getTrackId.toString, 1))
      .group
      .sum
      .toTypedPipe
      .write(TypedTsv[(CharSequence, Int)](args("output")))