I am writing an integration test for an application that connects to Kafka to consume and publish data and for that purpose I am using EmbeddedKafka. Part of the logic is to consume messages with specific offsets. I want to simulate this, therefore my goal is to:
This doesn't work now, i.e. I'm sending messages with KafkaHeaders.OFFSET
, but it's ignored, the message that I'm consuming afterwards has a different offset. In fact the offset just starts with 0 and is then incremented.
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.PARTITION_ID, partition);
.setHeader(KafkaHeaders.OFFSET, val);
kafkaTemplate.send(messageBuilder.build());
KafkaTemplate<String, String>
is initialised in a standard way. On the other end I consume in a standard way:
private class MessagesWithOffsetsConsumer implements BatchMessageListener<String, String>, ConsumerSeekAware {
MessagesWithOffsetsConsumer() {
}
@Override
public void onMessage(List<ConsumerRecord<String, String>> records) {
records.forEach(record -> {
String id = record.key();
String dataAssetPayload = record.value();
int partitionId = record.partition();
LOGGER.info("Received record: {} offset: {}", id, record.offset());
}
}
In short, offset of a message received in onMessage
is not the same as set during message building.
Is there any way to achieve this?
No, there is no something like an explicit offset value on the ProducerRecord
.
You only can set these props:
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
And that's really what is done from the KafkaTemplate
logic. The KafkaHeaders.OFFSET
is a consumer side property:
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final Headers headers;
private final K key;
private final V value;
In other words: Apache Kafka is not designed for an explicit offset setting.