scalaapache-flinkavroshapelessimplicits

Generic Avro Serde using shapeless-datatype


I'm struggling creating a generic AvroSerde in Scala. I will be using this serde in combination with Flink therefore this serde should also be serializable itself. Avro doesn't have any native support for Scala, however there are some libraries which enable conversion from case classes to generic records using shapeless. Note: this generic serializer will only be instantiated with case classes.

Firstly, I tried to implement this serde using Avro4s. I got this compiled pretty easily by ensuring that the generic type was context bound to FromRecord and RecordFrom, however both FromRecord and RecordFrom aren't serializable therefore I can't use this serde in Flink.

Currently, I'm trying a different library shapeless-datatype which also uses shapeless. My current code looks like this:

class Serializer[T : TypeTag : ClassTag] {

  //Get type of the class at run time
  val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]

  //Get Avro Type
  val avroType = AvroType[T]

  def serialize(value : T) : Array[Byte] = {
    var schema: Schema = null

    if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
      schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
    } else {
      schema = ReflectData.get().getSchema(inputClassType)
    }

    val out: ByteArrayOutputStream = new ByteArrayOutputStream()
    val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    var writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)

    val genericRecord = avroType.toGenericRecord(value)

    writer.write(genericRecord, encoder)
    encoder.flush()
    out.close()

    out.toByteArray
  }

  def deserialize(message: Array[Byte]) : T = {
    var schema: Schema = null

    if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
      schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
    } else {
      schema = ReflectData.get().getSchema(inputClassType)
    }

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val decoder = DecoderFactory.get().binaryDecoder(message, null)

    avroType.fromGenericRecord(datumReader.read(null, decoder)).get
  }


}

So basically I create an AvroType[T] which has two methods fromGenericRecord and toGenericRecord (source). Those methods require some implicits: LabelledGeneric.Aux[A, L], ToAvroRecord[L], tt: TypeTag[A] and fromL: FromAvroRecord[L].

Currently this code gives compile errors due to missing those implicits:

Error:(48, 51) could not find implicit value for parameter gen: shapeless.LabelledGeneric.Aux[T,L]
  val genericRecord = avroType.toGenericRecord(value)

Simply overloading the implicits from the toGenericRecord and fromGenericRecord methods doesn't solve it because then I need to parameterize serialize[L <: Hlist] and deserialize[L <: Hlist] which I can't do because Flink doesn't allow these methods to have types.

I have little experience with both shapeless and implicits to understand which context bounds I need to solve this, while also keeping this class serializable.

Hope someone can help or point me to some useful resources.

Thanks, Wouter

EDIT

I can't pass implicits through the methods nor make them parametrized, since I need to base the serde on serialization interfaces of Flink, which forces me to override: byte[] serialize(T element) and T deserialize(byte[] message)

If I try to pass the implicit to the class itself, I would need to change it to:

class Serializer[T : TypeTag : ClassTag, L <: HList](implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L], fromL: FromAvroRecord[L])

but then if I instantiate it like this:

case class Test(str: String)
val serializer = new Serializer[Test]

I get this compile error:

Error:(29, 26) wrong number of type arguments for shapeless.datatype.avro.Serializer, should be 2
val serializer = new Serializer[Test]

Solution

  • You should make Serializer a type class. (By the way, using vars without necessity is a bad practice.)

    import java.io.ByteArrayOutputStream
    import org.apache.avro.Schema
    import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
    import org.apache.avro.io.{BinaryEncoder, DatumWriter, DecoderFactory, EncoderFactory}
    import org.apache.avro.reflect.ReflectData
    import org.apache.avro.specific.SpecificRecordBase
    import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import shapeless.datatype.avro.{AvroType, FromAvroRecord, ToAvroRecord}
    import shapeless.{HList, LabelledGeneric}  
    import scala.reflect.runtime.universe.TypeTag
    import scala.reflect.{ClassTag, classTag}
    
    trait Serializer[T] extends SerializationSchema[T] with DeserializationSchema[T] {
      type L <: HList
    }
    
    object Serializer {
      type Aux[T, L0 <: HList] = Serializer[T] { type L = L0 }
    
      def apply[T](implicit serializer: Serializer[T]): Serializer[T] = serializer
    
      implicit def mkSerializer[T : ClassTag : TypeTag, L0 <: HList](implicit
        gen: LabelledGeneric.Aux[T, L0],
        toL: ToAvroRecord[L0],
        fromL: FromAvroRecord[L0]): Aux[T, L0] =
        new Serializer[T] {
          type L = L0
    
          //Get type of the class at run time
          val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
    
          //Get Avro Type
          val avroType = AvroType[T]
    
          override def serialize(value : T) : Array[Byte] = {
            val schema: Schema =
              if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
                inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
              else ReflectData.get().getSchema(inputClassType)
    
            val out: ByteArrayOutputStream = new ByteArrayOutputStream()
            val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
            val writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
    
            val genericRecord = avroType.toGenericRecord(value)
    
            writer.write(genericRecord, encoder)
            encoder.flush()
            out.close()
    
            out.toByteArray
          }
    
          override def deserialize(message: Array[Byte]) : T = {
            val schema: Schema =
              if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
                inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
              else ReflectData.get().getSchema(inputClassType)
    
            val datumReader = new GenericDatumReader[GenericRecord](schema)
            val decoder = DecoderFactory.get().binaryDecoder(message, null)
    
            avroType.fromGenericRecord(datumReader.read(null, decoder)).get
          }
    
          override def isEndOfStream(nextElement: T): Boolean = ???
    
          override def getProducedType: TypeInformation[T] = ???
        }
    }
    
    case class Test(str: String)    
    val serializer = Serializer[Test]