scalagenericstype-inferencetype-boundsclasstag

Prove that a runtimeClass satisfies a type Bound in Scala


I have a method that writes one of my classes Foo, which is defined as Thrift, in Parquet form.

  import Foo
  import org.apache.spark.rdd.RDD
  import org.apache.thrift.TBase
  import org.apache.hadoop.mapreduce.Job
  import org.apache.parquet.hadoop.ParquetOutputFormat
  import org.apache.parquet.hadoop.thrift.ParquetThriftOutputFormat

  def writeThriftParquet(rdd: RDD[Foo], outputPath: String): Unit = {
    val job = Job.getInstance()
    ParquetThriftOutputFormat.setThriftClass(job, classOf[Foo])
    ParquetOutputFormat.setWriteSupportClass(job, classOf[Foo])

    rdd
      .map(x => (null, x))
      .saveAsNewAPIHadoopFile(
        outputPath,
        classOf[Void],
        classOf[Foo],
        classOf[ParquetThriftOutputFormat[Foo]],
        job.getConfiguration)
  }

This works fine, but I'd prefer to write a more generic method. I tried the (relatively) simple:

  def writeThriftParquetGeneral[A <: TBase[_, _]](rdd: RDD[A], outputPath: String): Unit = {
    val job = Job.getInstance()
    ParquetThriftOutputFormat.setThriftClass(job, classOf[A])
    ParquetOutputFormat.setWriteSupportClass(job, classOf[A])

    rdd
      .map(x => (null, x))
      .saveAsNewAPIHadoopFile(
        outputPath,
        classOf[Void],
        classOf[A],
        classOf[ParquetThriftOutputFormat[A]],
        job.getConfiguration)
  }

but that fails with errors like:

 class type required but A found ParquetThriftOutputFormat.setThriftClass(job, classOf[A])
 class type required but A found ParquetOutputFormat.setWriteSupportClass(job, classOf[A])

To try to remedy that, I've used a ClassTag, but haven't gotten things to compile.

  import scala.reflect._
  implicit val ct = ClassTag[Foo](classOf[Foo])

  def writeThriftParquetGeneral[A <: TBase[_, _]](rdd: RDD[A], outputPath: String)(
    implicit tag: ClassTag[A]): Unit = {
    val job = Job.getInstance()

    // The problem line
    ParquetThriftOutputFormat.setThriftClass(job, tag.runtimeClass)

    // Seems OK from here
    ParquetOutputFormat.setWriteSupportClass(job, tag.runtimeClass)

    rdd
      .map(x => (null, x))
      .saveAsNewAPIHadoopFile(
        outputPath,
        classOf[Void],
        tag.runtimeClass,
        classOf[ParquetThriftOutputFormat[A]],
        job.getConfiguration)
  }

This fails at the line: ParquetThriftOutputFormat.setThriftClass(job, tag.runtimeClass)

[error]  found   : Class[_$1] where type _$1
[error]  required: Class[_ <: org.apache.thrift.TBase[_, _]]

I'm surprised the compiler (Scala 2.11) isn't recognizing that tag.runtimeClass must be a classOf[A], and A satisfies the type bound by definition.


Solution

  • ClassTag#runtimeClass returns just a Class[_]

    https://github.com/scala/scala/blob/2.13.x/src/library/scala/reflect/ClassTag.scala#L55

    Class[_ <: TBase[_, _]] is an existential type different from Class[_] (actually its subtype)

    implicitly[Class[_ <: TBase[_, _]] <:< Class[_]]
    

    Try to replace the problem line with

    ParquetThriftOutputFormat.setThriftClass(job, classTag.runtimeClass.asSubclass(classOf[TBase[_, _]]))