scalaapache-sparkserializationkryo

Error when enabling Kyro Serializer with Spark


I built a pipeline with spark 3.3.0 using scala.

I was using Java Serializer till now without issue. Two days ago, I face some performance issue and tried to use Kyro Serializer instead following this gist without success. I get the following compilation error.

type mismatch; found : Array[Class[_ >: ?0(in lazy value spark) with (some other)?0(in lazy value spark) with (some other)(some other)?0(in lazy value spark) with (some other)(some other)?0(in lazy value spark) with (some other)(some other)(some other)?0(in lazy value spark) with (some other)(some other)(some other)?0(in lazy value spark) with (some other)(some other)(some other)(some other)?0(in lazy value spark) with (some other)(some other)(some other)(some other)?0(in lazy value spark) with Array[org.apache.spark.sql.types.DataType] ... required: Array[Class[]] Note: Class[ >: ?0 with ?0 with ?0 with ?0 with ?0 with ?0 with ?0 with ?0 with Array[org.apache.spark.sql.types.DataType] with org.apache.spark.util.collection.BitSet with ?0 with ?0 with org.apache.spark.sql.catalyst.expressions.UnsafeRow with Array[org.apache.spark.sql.catalyst.InternalRow] with ?0 with org.apache.spark.sql.types.ArrayType with org.apache.spark.sql.types.Metadata with ?0 with ?0 with ?0 with ?0 with ?0 with Array[org.apache.spark.sql.types.StructField] with org.apache.spark.sql.types.StructField with Array[org.apache.spark.sql.types.StructType] with org.apache.spark.sql.types.StructType with collection.mutable.WrappedArray.ofRef[_$1] forSome { type $1 }] <: Class[], but class Array is invariant in type T. You may wish to investigate a wildcard type such as _ <: Class[_]. (SLS 3.2.10)

It seems like a type mismatch error. But I am not a scala expert, so I do not know how to fix it. Can anyone help me fix this ?

Below is my code.


def getKyroConfig() = {
    val conf = Array(
      classOf[scala.collection.mutable.WrappedArray.ofRef[_]],
      classOf[org.apache.spark.sql.types.StructType],
      classOf[Array[org.apache.spark.sql.types.StructType]],
      classOf[org.apache.spark.sql.types.StructField],
      classOf[Array[org.apache.spark.sql.types.StructField]],
      Class.forName("org.apache.spark.sql.types.StringType$"),
      Class.forName("org.apache.spark.sql.types.LongType$"),
      Class.forName("org.apache.spark.sql.types.BooleanType$"),
      Class.forName("org.apache.spark.sql.types.DoubleType$"),
      Class.forName("[[B"),
      classOf[org.apache.spark.sql.types.Metadata],
      classOf[org.apache.spark.sql.types.ArrayType],
      Class.forName("org.apache.spark.sql.execution.joins.UnsafeHashedRelation"),
      classOf[org.apache.spark.sql.catalyst.InternalRow],
      classOf[Array[org.apache.spark.sql.catalyst.InternalRow]],
      classOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow],
      Class.forName("org.apache.spark.sql.execution.joins.LongHashedRelation"),
      Class.forName("org.apache.spark.sql.execution.joins.LongToUnsafeRowMap"),
      classOf[org.apache.spark.util.collection.BitSet],
      classOf[org.apache.spark.sql.types.DataType],
      classOf[Array[org.apache.spark.sql.types.DataType]],
      Class.forName("org.apache.spark.sql.types.NullType$"),
      Class.forName("org.apache.spark.sql.types.IntegerType$"),
      Class.forName("org.apache.spark.sql.types.TimestampType$"),
      Class.forName("org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult"),
      Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
      Class.forName("scala.collection.immutable.Set$EmptySet$"),
      Class.forName("scala.reflect.ClassTag$$anon$1"),
      Class.forName("java.lang.Class")
    )

    conf
  }

val spark: SparkSession = {
    val conf_spark: SparkConf =  new SparkConf()

      val kyro_config = getKyroConfig()
      conf_spark.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      conf_spark.set("spark.kryo.registrationRequired", "true")
      conf_spark.set("spark.kryoserializer.buffer", "1024k")
      conf_spark.set("spark.kryoserializer.buffer.max", "1024m")
      conf_spark.registerKryoClasses(kyro_config)

    SparkSession.builder()
      .config(conf_spark)
      .getOrCreate()
  }


Solution

  • To make your code compile it's enough to provide the type of val conf explicitly:

    val conf: Array[Class[_]] = Array(...
    

    rather than just

    val conf = Array(...
    

    Otherwise types were inferred incorrectly.

    Array[Class[_ >: ... with ... with ....]] and Array[Class[_]] are so called existential types

    What is an existential type?

    https://stackoverflow.com/questions/tagged/existential-type%2bscala?tab=Votes

    Normally Scala infers types correctly but sometimes you have to provide hints.