javaapache-kafkaspring-kafkaspring-cloud-stream

How to expose either kafka's ConsumerRecord<K, V>, or Spring cloud Stream's metadata to deserializer?


Spring cloud stream integration permits the users to provide a Function<Message<T>> implementation to handle messages received via some preconfigured MQ implementation, such as kafka. It also permits setting a deserializer that will be converting the byte array for both keys, and values into something that is more useful to the application, and offload deserialization to the kafka client. Looking at the cloud stream integration it uses regular kafka client under the hood and interacts with ConsumerRecord<K, V> directly, and maps additional fields like topic, partition, offset, and received timestamp as headers in the provided message to the Function implementation. Question stands, how would one configure either Spring cloud stream, or the Kafka deserializers integration, to provide either consumer record, or the received timestamp into the deserializer, so that the deserializer would be able to interact with that value (ex. to use it as "created message timestamp" of sorts). My current workaround is to map kafka_receivedTimestamp header in the listener, but that causes to leak the deserialization part into the consumer that would do actual processing of the message. Code examples below

package foo;
record Container(long id, long createdAt) {}
class ContainerDeserializer implements Deserializer<Container> {
    public Container deserialize(String topic, Headers headers, byte[] data) {
        return new Container(ByteBuffer.wrap(data).longValue(), 0L); // would like to read kafka_receivedTimestamp here
    }
}
class ContainerConsumer implements Consumer<Message<Container>> {
    public void accept(Message<Container> message) {
        Container original = message.getPayload();
        Container enriched = new Container(original.id(), message.getHeaders().get("kafka_receivedTimestamp", Long.class));
        // do something with enriched counterpart
    }
}

and in the yaml configuration

spring:
  cloud:
    stream:
      kafka:
        bindings:
          containerConsumer-in-0:
            consumer:
              configuration:
                spring.deserializer.value.delegate.class: foo.ContainerDeserializer

Solution

  • So, apparently you want to have a Container passed to your ContainerConsumer, but already with a timestamp set into that object and as a constructor argument.

    I would suggest do not go a custom Deserializer since those ConsumerRecord properties are not available for that contract. Instead use a standard ByteArrayDeserializer and provide a custom MessageConverter bean and use its name for the spring.cloud.stream.kafka.default.consumer.converterBeanName property: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/config-options.html#kafka-consumer-properties.

    This way Kafka Binder would pick it up and configure this converter into the KafkaMessageDrivenChannelAdapter. You can use a standard MessagingMessageConverter and inject some custom setMessagingConverter() to deserialize those bytes into your Container from a Message<byte[]> where you can take that kafka_receivedTimestamp header for your Container object.

    In the end you would not need to do anything in the ContainerConsumer.