amazon-web-servicesscalaapache-sparkamazon-emrkryo

Spark Kryo deserialization of EMR-produced files fails locally


Upon upgrading EMR version to 6.2.0 (we previously used 5.0 beta - ish) and Spark 3.0.1, we noticed that we were unable to locally read Kryo files written from EMR clusters (this was obviously possible previously). When trying to read such a file, the exception thrown is along the lines of:

com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: scala.Tuple3 cannot be cast to scala.Tuple2

We use spark 3.0.1 and Kryo 4.0.2 (bundled with) and read Kryo files using Kryo::readClassAndObject, operating on an RDD reead using SparkContext::sequenceFile.


Solution

  • TL;DR: AWS EMR 6.2.0 (maybe earlier too) causes local deserialization of Kryo files, written from EMR clusters, to fail (due to clusters running an AWS Spark fork). Code to fix is attached @ end of post.


    Since recently, Amazon EMR clusters run their own fork of Apache Spark (namely, for EMR 6.2.0 clusters, the Spark version is 3.0.1.amzn-0), with Kryo included as the default serialization framework, which we use ourselves. Ever since upgrading to 6.2.0, we noticed we could not locally read Kryo files written from EMR 6.2.0 clusters, they would fail with a message along the lines of:

    com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: scala.Tuple3 cannot be cast to scala.Tuple2
    

    Where the RDD we were attempting to read was indeed an RDD of type Tuple2, however apparently when deserializing, Kryo thought it was encoded as an RDD of Tuple3 for some reason.

    Now, internally, Kryo holds a mapping of ID <-> class that is built at runtime and is expected to be consistent between the reading and writing JVMs (used to figure out which class to deserialize to). This registry is built upon instantiation of the Kryo instance (we use org.apache.spark.serializer.KryoSerializer::newKryo). Upon inspection, we noted that indeed the IDs of Tuple2 were different between the EMR cluster performing the serialization and our local machines, and the difference was attributed to a single class that exists in the EMR setup and not locally- this class is org.apache.spark.scheduler.HighlyCompressedMapStatus$CompressedSizes which does not exist in any publicly available Spark code, so we attributed it to the Amazon spark fork. This effectively means that we are unable to locally read almost any class written by an EMR cluster, since it is impossible to use that fork of Spark locally, and that class is registered at ID 13 upon creation of the Kryo instance (may change later obviously), causing almost all classes to fail deserialization.

    The ugly fix here is to use the ClassResolver of the Kryo instance. If the CompressedSizes class does not exist in the registry, we register all classes with id x >= 13, as x + 1. This is really ugly, but as a local fix, it works. Obviously, it may also break for new releases of EMR/Kryo/Spark, so be extremely careful with it (we only use this locally for debugging, which is still a lot).

    Code: Previously, we would create the Kryo instance like this:

    val kryoSerializer = new KryoSerializer(sc.getConf)
    val kryo = kryoSerializer.newKryo()
    

    Now, we use this:

    val kryo = adjustRegistrationsForEmrSpark(kryoSerializer.newKryo())
    

    where

    private def adjustRegistrationsForEmrSpark(kryo: Kryo): Kryo = {
        val existingRegistrations = getRegistrations(kryo)
        val emrSpecificClassExists = existingRegistrations.exists(_.getType.getName.contains("CompressedSizes"))
        if (emrSpecificClassExists) {
            println(s"detected emr-specific class when creating kryo, not making any adjustments")
            kryo
        } else {
            println(s"emr-specific class missing from registrations, adjusting existing classes by an offset of 1 to compensate")
            val classResolver = kryo.getClassResolver
            existingRegistrations.filter(_.getId >= 13).foreach { registration =>
                val toRegister = new Registration(registration.getType, registration.getSerializer, registration.getId + 1)
                classResolver.register(toRegister)
            }
            kryo
        }
    }
    
    private def getRegistrations(kryo: Kryo): List[Registration] = {
        var classIndex = 0
        var reg: Registration = null
        var result: List[Registration] = List()
        do {
            reg = kryo.getClassResolver.getRegistration(classIndex)
            if (reg != null) result ++= List(reg)
            classIndex = classIndex + 1
        } while (reg != null)
        result
    }