spring-bootapache-kafkaspring-kafkaspring-kafka-testembedded-kafka

How to send a message with specific offset to EmbeddedKafka in spring boot?


I am writing an integration test for an application that connects to Kafka to consume and publish data and for that purpose I am using EmbeddedKafka. Part of the logic is to consume messages with specific offsets. I want to simulate this, therefore my goal is to:

  1. send some messages to EmbeddedKafka, but with specific offsets
  2. consume them with the same offsets

This doesn't work now, i.e. I'm sending messages with KafkaHeaders.OFFSET, but it's ignored, the message that I'm consuming afterwards has a different offset. In fact the offset just starts with 0 and is then incremented.

MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(payload)
        .setHeader(KafkaHeaders.TOPIC, topic)
        .setHeader(KafkaHeaders.MESSAGE_KEY, key)
        .setHeader(KafkaHeaders.PARTITION_ID, partition);
        .setHeader(KafkaHeaders.OFFSET, val);
kafkaTemplate.send(messageBuilder.build());

KafkaTemplate<String, String> is initialised in a standard way. On the other end I consume in a standard way:

private class MessagesWithOffsetsConsumer implements BatchMessageListener<String, String>, ConsumerSeekAware {

    MessagesWithOffsetsConsumer() {
    }

    @Override
    public void onMessage(List<ConsumerRecord<String, String>> records) {

        records.forEach(record -> {
            String id = record.key();
            String dataAssetPayload = record.value();
            int partitionId = record.partition();

            LOGGER.info("Received record: {} offset: {}", id, record.offset());
    }
}

In short, offset of a message received in onMessage is not the same as set during message building.

Is there any way to achieve this?


Solution

  • No, there is no something like an explicit offset value on the ProducerRecord. You only can set these props:

    public class ProducerRecord<K, V> {
    
        private final String topic;
        private final Integer partition;
        private final Headers headers;
        private final K key;
        private final V value;
        private final Long timestamp;
    

    And that's really what is done from the KafkaTemplate logic. The KafkaHeaders.OFFSET is a consumer side property:

    public class ConsumerRecord<K, V> {
    
        private final String topic;
        private final int partition;
        private final long offset;
        private final long timestamp;
        private final Headers headers;
        private final K key;
        private final V value;
    

    In other words: Apache Kafka is not designed for an explicit offset setting.