I'm struggling creating a generic AvroSerde in Scala. I will be using this serde in combination with Flink therefore this serde should also be serializable itself. Avro doesn't have any native support for Scala, however there are some libraries which enable conversion from case classes to generic records using shapeless. Note: this generic serializer will only be instantiated with case classes.
Firstly, I tried to implement this serde using Avro4s. I got this compiled pretty easily by ensuring that the generic type was context bound to FromRecord
and RecordFrom
, however both FromRecord
and RecordFrom
aren't serializable therefore I can't use this serde in Flink.
Currently, I'm trying a different library shapeless-datatype which also uses shapeless. My current code looks like this:
class Serializer[T : TypeTag : ClassTag] {
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
//Get Avro Type
val avroType = AvroType[T]
def serialize(value : T) : Array[Byte] = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
var writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
val genericRecord = avroType.toGenericRecord(value)
writer.write(genericRecord, encoder)
encoder.flush()
out.close()
out.toByteArray
}
def deserialize(message: Array[Byte]) : T = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
avroType.fromGenericRecord(datumReader.read(null, decoder)).get
}
}
So basically I create an AvroType[T]
which has two methods fromGenericRecord
and toGenericRecord
(source). Those methods require some implicits: LabelledGeneric.Aux[A, L]
, ToAvroRecord[L]
, tt: TypeTag[A]
and fromL: FromAvroRecord[L]
.
Currently this code gives compile errors due to missing those implicits:
Error:(48, 51) could not find implicit value for parameter gen: shapeless.LabelledGeneric.Aux[T,L]
val genericRecord = avroType.toGenericRecord(value)
Simply overloading the implicits from the toGenericRecord
and fromGenericRecord
methods doesn't solve it because then I need to parameterize serialize[L <: Hlist]
and deserialize[L <: Hlist]
which I can't do because Flink doesn't allow these methods to have types.
I have little experience with both shapeless and implicits to understand which context bounds I need to solve this, while also keeping this class serializable.
Hope someone can help or point me to some useful resources.
Thanks, Wouter
EDIT
I can't pass implicits through the methods nor make them parametrized, since I need to base the serde on serialization interfaces of Flink, which forces me to override: byte[] serialize(T element)
and T deserialize(byte[] message)
If I try to pass the implicit to the class itself, I would need to change it to:
class Serializer[T : TypeTag : ClassTag, L <: HList](implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L], fromL: FromAvroRecord[L])
but then if I instantiate it like this:
case class Test(str: String)
val serializer = new Serializer[Test]
I get this compile error:
Error:(29, 26) wrong number of type arguments for shapeless.datatype.avro.Serializer, should be 2
val serializer = new Serializer[Test]
You should make Serializer
a type class. (By the way, using var
s without necessity is a bad practice.)
import java.io.ByteArrayOutputStream
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{BinaryEncoder, DatumWriter, DecoderFactory, EncoderFactory}
import org.apache.avro.reflect.ReflectData
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
import shapeless.datatype.avro.{AvroType, FromAvroRecord, ToAvroRecord}
import shapeless.{HList, LabelledGeneric}
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.{ClassTag, classTag}
trait Serializer[T] extends SerializationSchema[T] with DeserializationSchema[T] {
type L <: HList
}
object Serializer {
type Aux[T, L0 <: HList] = Serializer[T] { type L = L0 }
def apply[T](implicit serializer: Serializer[T]): Serializer[T] = serializer
implicit def mkSerializer[T : ClassTag : TypeTag, L0 <: HList](implicit
gen: LabelledGeneric.Aux[T, L0],
toL: ToAvroRecord[L0],
fromL: FromAvroRecord[L0]): Aux[T, L0] =
new Serializer[T] {
type L = L0
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
//Get Avro Type
val avroType = AvroType[T]
override def serialize(value : T) : Array[Byte] = {
val schema: Schema =
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
else ReflectData.get().getSchema(inputClassType)
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
val writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
val genericRecord = avroType.toGenericRecord(value)
writer.write(genericRecord, encoder)
encoder.flush()
out.close()
out.toByteArray
}
override def deserialize(message: Array[Byte]) : T = {
val schema: Schema =
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
else ReflectData.get().getSchema(inputClassType)
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
avroType.fromGenericRecord(datumReader.read(null, decoder)).get
}
override def isEndOfStream(nextElement: T): Boolean = ???
override def getProducedType: TypeInformation[T] = ???
}
}
case class Test(str: String)
val serializer = Serializer[Test]