I am writting a kafka stream application in Kotlin, which consumes a JSON message (no AVRO or Schema Registry).
In MyMessage.kt I have declared the MyMessage
class as @Serializable
.
MyMessage.kt
import kotlinx.serialization.Serializable
@Serializable
data class MyMessage(val from: String, val to: String, val msg: String)
Streaming.kt
val s: KString<String, MyMessage> = streamsBuilder()
.stream("my", Consumed.with(Serdes.String, Serdes.serdeFrom(MyMessage::class.java)
When running this I get the following error on the above line:
Exception in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID
What am I missing?
The argument for Serdes.serdesFrom()
expects a Serializer
and Deserializer
object (both interfaces are Kafka interfaces from package org.apache.kafka.common.serialization
and have nothing to do with the @Serializable
annotation.
You need to create classes MyMessageSerializer extends Serializer
and MyMessageDeserialzer extends Deserializer
and pass those object into the method.
To implement the actual serialization / deserialization with both the classes you may rely on the default serialization / deserialization if you want.