kotlinserializationapache-kafka-streams

Serde class for custom JSON (Kotlin & Kafka)


I am writting a kafka stream application in Kotlin, which consumes a JSON message (no AVRO or Schema Registry).

In MyMessage.kt I have declared the MyMessage class as @Serializable.

MyMessage.kt

import kotlinx.serialization.Serializable

@Serializable
data class MyMessage(val from: String, val to: String, val msg: String)

Streaming.kt

val s: KString<String, MyMessage> = streamsBuilder()
    .stream("my", Consumed.with(Serdes.String, Serdes.serdeFrom(MyMessage::class.java)

When running this I get the following error on the above line:

Exception in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID

What am I missing?


Solution

  • The argument for Serdes.serdesFrom() expects a Serializer and Deserializer object (both interfaces are Kafka interfaces from package org.apache.kafka.common.serialization and have nothing to do with the @Serializable annotation.

    You need to create classes MyMessageSerializer extends Serializer and MyMessageDeserialzer extends Deserializer and pass those object into the method. To implement the actual serialization / deserialization with both the classes you may rely on the default serialization / deserialization if you want.