scalakryo

Migrating from Chill 0.6.0 (Kryo 2.21) to 0.9.5 (Kryo 4.0.2) and deserializing old messages


We are using Chill-bijection for serializing/deserializing messages to and from Kafka with Kryo. Older version of our application is using Chill 0.6.0 which depends on com.esotericsoftware.kryo.kryo-2.21.jar and new version of our application uses Chill 0.9.5 which depends on com.esotericsoftware.kryo-shaded-4.0.2.jar.

In order to minimize downtime, new version of our application needs to be able to read messages which are written by old version of the application, but it fails with an error:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition prod_2x02_external_entity_updates-0 at offset 8764198. If needed, please seek past the record to continue consumption.
Caused by: com.twitter.bijection.InversionFailure: Failed to invert: [B@14122f45
    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:43)
    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:42)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at scala.util.Failure.recoverWith(Try.scala:236)
    at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
    at com.X.backend.serialization.CustomKafkaKryoDeserializer.deserialize(KafkaKryoSerialization.scala:38)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at com.X.common.kafka.KafkaSubscriber$$anonfun$brokerFound$1.applyOrElse(KafkaSubscriber.scala:163)
    at akka.actor.Actor.aroundReceive(Actor.scala:535)
    at akka.actor.Actor.aroundReceive$(Actor.scala:533)
    at com.X.common.kafka.KafkaSubscriber.aroundReceive(KafkaSubscriber.scala:29)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.X.backend.DashboardExternalEntities$ExtMessage
Serialization trace:
entity (com.X.backend.QueueMessageProtocol$ExternalEntityUpdated)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at com.twitter.chill.SerDeState.readClassAndObject(SerDeState.java:61)
    at com.twitter.chill.KryoPool.fromBytes(KryoPool.java:94)
    at com.X.backend.serialization.CustomKafkaKryoDeserializer.$anonfun$deserialize$1(KafkaKryoSerialization.scala:38)
    at com.twitter.bijection.Inversion$.$anonfun$attempt$1(Inversion.scala:32)
    at scala.util.Try$.apply(Try.scala:213)
    ... 25 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.X.backend.DashboardExternalEntities$ExtMessage
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
    ... 35 common frames omitted

Based on this: https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-4.0.0 I implemented a custom ScalaKryoInstantiator and corresponding classes to add setOptimizedGenerics(true):

class CustomKafkaKryoDeserializer[M <: AnyRef] extends KafkaDeserializer[M] {
  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

  override def close(): Unit = ()

  override def deserialize(topic: String, data: Array[Byte]): M = {
    Inversion.attempt(data) {
      CustomKryoInstantiator.defaultPool.fromBytes(_)
    }.get.asInstanceOf[M]
  }
}

object CustomKryoInstantiator extends ScalaKryoInstantiator {
  private val mutex = new AnyRef with Serializable // some serializable object
  @transient private var kpool: Option[KryoPool] = None

  def defaultPool: KryoPool = mutex.synchronized {
    if (kpool.isEmpty) {
      kpool = Some(KryoPool.withByteArrayOutputStream(guessThreads, new CustomKryoInstantiator))
    }
    kpool.get
  }

  private def guessThreads: Int = {
    val cores = Runtime.getRuntime.availableProcessors
    val GUESS_THREADS_PER_CORE = 4
    GUESS_THREADS_PER_CORE * cores
  }
}

class CustomKryoInstantiator extends EmptyScalaKryoInstantiator {
  override def newKryo: KryoBase = {
    val k = super.newKryo
    k.getFieldSerializerConfig.setOptimizedGenerics(true)
    val reg = new AllScalaRegistrar
    reg(k)
    k
  }
}

but I still get the same error. Is there a way to read messages, serialized by Kryo 2.21, with Kryo 4.0.2? Message classes themselves have not changed.


Solution

  • It turned out, that the message packages were renamed and therefore Kryo was unable to find correct classes.

    Still, even when package renaming was reverted, Kryo 4.0.2 and 3.0.3 were unable to deserialize messages which were serialized with Kryo 2.21.

    In conclusion, we decided to replace Kryo with Protobuf and write a MirrorMakerMessageHandler to convert Kafka messages from Chill-bijection 0.6.0 (Kryo 2.21) to Protobuf.