I am seeing an error when running my spark job relating to Serialization of a protobuf field when transforming an RDD.
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: otherAuthors_ (com.thomsonreuters.kraken.medusa.dbor.proto.Book$DBBooks)
The error seems to be created at this point:
val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map {
tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b) && !isBookPublished(o)).mapPartitions( it =>
it.map{ord =>
(ord.getAuthor, ord.getPublisherName, getGenre(ord.getSourceCountry))}))
}
val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) =>
opt.map(o => (tier, o._1, PublisherCompanyComparison, o._3)).countByValue()
}
val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) =>
opt.map(o => (tier, o._1, PublisherComparison(o._2), o._3)).countByValue()
}
The field is a list specified in the protobuf as the below:
otherAuthors_ = java.util.Collections.emptyList()
As you can see the code is not actually utilising that field from the Book Protobuf, although it still is being transmitted over the network.
Has anyone got any advice on this?
OK, old question but here is an answer for the future generations. Default kryo serializers don't work well with some collections. There is a third party library that helps with it: kryo-serializers
In your case you probably need to provide a custom kryo registrator when creating spark config:
val conf = new SparkConf()
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
With needed custom registrations in your registrator:
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register( Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer() );
// Probably should use proto serializer for your proto classes
kryo.register( Book.class, new ProtobufSerializer() );
}
}