scalaserializationapache-kafkapattern-matchingavro4s

Scala, how to simplify or reuse side-effecting pattern matching logic?


I am looking for a way to refactor this code and make it cleaner. I am certain that there is a way but haven't been able to figure it out.

I am working with avro4s, I need to enable serialisation for Kafka and I have two criteria:

The solution I came up with is creating two serialisers that take the parent trait as input, and in each of them I first pattern match on the input and then choose a binary or json serialization.

In the code below, the key issues are those AvroOutputStream.binary[T] or AvroOutputStream.json[T]. I can't find a way to reduce the code duplication.

trait KafkaMessage

case class UserDeleted(name: String, age: Int) extends KafkaMessage

case class UserCreated(name: String, age: Int) extends KafkaMessage

class KafkaMessageBinarySerializer extends Serializer[KafkaMessage] with Serializable {

  override def serialize(topic: String, data: KafkaMessage): Array[Byte] = {
    val byteStream = new ByteArrayOutputStream()

    // TODO: optimise this
    val output = data match {
      case e: UserCreated =>
        val output = AvroOutputStream.binary[UserCreated].to(byteStream).build()
        output.write(e)
        output

      case e: UserDeleted =>
        val output = AvroOutputStream.binary[UserDeleted].to(byteStream).build()
        output.write(e)
        output
    }

    output.close()
    byteStream.toByteArray
  }
}

class KafkaMessageJsonSerializer extends Serializer[KafkaMessage] with Serializable {

  override def serialize(topic: String, data: KafkaMessage): Array[Byte] = {
    val byteStream = new ByteArrayOutputStream()

    // TODO: optimise this
    val output = data match {
      case e: UserCreated =>
        val output = AvroOutputStream.json[UserCreated].to(byteStream).build()
        output.write(e)
        output

      case e: UserDeleted =>
        val output = AvroOutputStream.json[UserDeleted].to(byteStream).build()
        output.write(e)
        output
    }

    output.close()
    byteStream.toByteArray
  }
}

The problem really isn't much with serialization or Kafka, as much as with types and pattern matching.

My solution screams to be improved, but my fiddling around didn't produce anything nice 😔 I would love to get some advice on how to improve this.


Solution

  • Do you mean something like this:

    trait Dispatched[T] {
      type Out <: T
      val value: Out
      val builder: AvroOutputStreamBuilder[Out]
    }
    object Dispatched {
      def apply[T, T1 <: T](_value: T1, _builder: AvroOutputStreamBuilder[T1]): Dispatched[T] =
        new Dispatched[T] {
          type Out = T1
          val value = _value
          val builder = _builder
        }
    }
    
    trait CommonLogic[T] extends Serializer[T] with Serializable {
    
      protected val dispatch: T => Dispatched[T]
    
      override def serialize(topic: String, data: T): Array[Byte] = {
        val byteStream = new ByteArrayOutputStream()
        // could be made prettier with some custom extractor, but whatever
        val pair = dispatch(value) 
    
        scala.util.Using(pair.builder.to(byteStream).build()) { output =>
          output.write(pair.value)
        }
    
        byteStream.toByteArray
      }
    }
    
    class KafkaMessageBinarySerializer extends CommonLogic[KafkaMessage] {
    
      override protected val dispatch = {
        case e: UserCreated => Dispatched(e, AvroOutputStream.binary[UserCreated])
        case e: UserDeleted => Dispatched(e, AvroOutputStream.binary[UserDeleted])
      }
    }
    
    class KafkaMessageJsonSerializer extends CommonLogic[KafkaMessage] {
    
      override protected val dispatch = {
        case e: UserCreated => Dispatched(e, AvroOutputStream.json[UserCreated])
        case e: UserDeleted => Dispatched(e, AvroOutputStream.json[UserDeleted])
      }
    }
    

    This logic would let you create an lot of different serializers only having to implement dispatch in each of them. Since Dispatched use path-dependent types there is no issue with making sure that Builder matches the value you want to put inside.

    To reduce it further, you can look inside and see what these .json and .binary method do:

    // binary
    new AvroOutputStreamBuilder[T](schema, encoder, AvroFormat.Binary)
    // json
    new AvroOutputStreamBuilder[T](schema, encoder, AvroFormat.Json)
    

    They basically pass 2 implicit parameters and hardcode 1 flag. So you could do:

    trait Dispatched[T] {
      type Out <: T
      val value: Out
      val schema: Schema[Out]
      val encoder: Encoder[Out]
    }
    object Dispatched {
      def apply[T, T1 <: T: SchemaFor : Encoder](_value: T1): Dispatched[T] =
        new Dispatched[T] {
          type Out = T1
          val value = _value
          val schema = implicitly[SchemaFor[T]]
          val encoder = implicitly[Encoder[T]]
        }
    }
    
    abstract class CommonLogic[T](format: AvroFormat) extends Serializer[T] with Serializable {
    
      protected val dispatch: T => Dispatched[T]
    
      override def serialize(topic: String, data: T): Array[Byte] = {
        val byteStream = new ByteArrayOutputStream()
        val triple = dispatch(data)
        val builder = new AvroOutputStreamBuilder(triple.schema, triple.encoder, format)
    
        scala.util.Using(builder.to(byteStream).build()) { output =>
          output.write(triple.value)
        }
    
        byteStream.toByteArray
      }
    }
    
    class KafkaMessageSerializer(format: AvroFormat) extends CommonLogic[KafkaMessage](format) {
    
      // could be derived with macros or type classes, but let's not go over the top
      override protected val dispatch = {
        case e: UserCreated => Dispatched(e)
        case e: UserDeleted => Dispatched(e)
      }
    }
    
    class KafkaMessageBinarySerializer extends KafkaMessageSerializer(AvroFormat.Binary)
    
    class KafkaMessageJsonSerializer extends KafkaMessageSerializer(AvroFormat.Json)
    

    This would let you share code even further, but at the expense of even more common code. So whether it makes sense or not depends on how many of these codecs you'd have to implement. If you needed to write a lot of them, and each of them would have a long list of subtypes to dispatch to, then something like this:

    trait Dispatcher[T] {
      def dispatch(value: T): Dispatched[T]
    }
    object Dispatcher {
      // implement derivation with Magnolia or sth
    }
    
    abstract class CommonLogic[T: Dispatcher](format: AvroFormat) extends Serializer[T] with Serializable {
    
      // no need to implement dispatch!
      // call implicitly[Dispatcher[T]].dispatch(data) instead!
      override def serialize(topic: String, data: T): Array[Byte] = ...
    }
    
    class KafkaMessageBinarySerializer extends CommonLogic[KafkaMessage](AvroFormat.Binary)
    
    class KafkaMessageJsonSerializer extends CommonLogic[KafkaMessage](AvroFormat.Json)
    

    could make sense.

    But if there is only few of them then I'd leave the code as is.