Spring cloud stream integration permits the users to provide a Function<Message<T>>
implementation to handle messages received via some preconfigured MQ implementation, such as kafka. It also permits setting a deserializer that will be converting the byte array for both keys, and values into something that is more useful to the application, and offload deserialization to the kafka client. Looking at the cloud stream integration it uses regular kafka client under the hood and interacts with ConsumerRecord<K, V>
directly, and maps additional fields like topic, partition, offset, and received timestamp as headers in the provided message to the Function
implementation. Question stands, how would one configure either Spring cloud stream, or the Kafka deserializers integration, to provide either consumer record, or the received timestamp into the deserializer, so that the deserializer would be able to interact with that value (ex. to use it as "created message timestamp" of sorts). My current workaround is to map kafka_receivedTimestamp
header in the listener, but that causes to leak the deserialization part into the consumer that would do actual processing of the message. Code examples below
package foo;
record Container(long id, long createdAt) {}
class ContainerDeserializer implements Deserializer<Container> {
public Container deserialize(String topic, Headers headers, byte[] data) {
return new Container(ByteBuffer.wrap(data).longValue(), 0L); // would like to read kafka_receivedTimestamp here
}
}
class ContainerConsumer implements Consumer<Message<Container>> {
public void accept(Message<Container> message) {
Container original = message.getPayload();
Container enriched = new Container(original.id(), message.getHeaders().get("kafka_receivedTimestamp", Long.class));
// do something with enriched counterpart
}
}
and in the yaml configuration
spring:
cloud:
stream:
kafka:
bindings:
containerConsumer-in-0:
consumer:
configuration:
spring.deserializer.value.delegate.class: foo.ContainerDeserializer
So, apparently you want to have a Container
passed to your ContainerConsumer
, but already with a timestamp
set into that object and as a constructor argument.
I would suggest do not go a custom Deserializer
since those ConsumerRecord
properties are not available for that contract. Instead use a standard ByteArrayDeserializer
and provide a custom MessageConverter
bean and use its name for the spring.cloud.stream.kafka.default.consumer.converterBeanName
property: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/config-options.html#kafka-consumer-properties.
This way Kafka Binder would pick it up and configure this converter into the KafkaMessageDrivenChannelAdapter
. You can use a standard MessagingMessageConverter
and inject some custom setMessagingConverter()
to deserialize those bytes into your Container
from a Message<byte[]>
where you can take that kafka_receivedTimestamp
header for your Container
object.
In the end you would not need to do anything in the ContainerConsumer
.