I made a unit test using KafkaEmbedded
(and KafkaTemplate
), but the message order is random. Does anyone know if it is logical, and if it is possible guaranty order?
here is my code:
public class KafkaTest {
private static String TOPIC = "test.topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TOPIC);
@Test
public void testEmbeddedKafkaSendOrder() throws Exception {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
KafkaTemplate<String, byte[]> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig));
kafkaTemplate.send(TOPIC, "TEST1".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST2".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST3".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST4".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST5".getBytes()).get();
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test-group");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put("auto.offset.reset", "earliest");
final Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerConfig);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, TOPIC);
ConsumerRecords<String, byte[]> records = consumer.poll(100L);
// Tests
final Iterator<ConsumerRecord<String, byte[]>> recordIterator = records.iterator();
while (recordIterator.hasNext()) {
System.out.println("received:" + new String(recordIterator.next().value()));
}
}
This code prints for example (but the order can change):
received:TEST2
received:TEST4
received:TEST1
received:TEST3
received:TEST5
In Kafka, you can be sure that order of messages is the same on the same partition, but not on the topic.
Note that as a topic typically has multiple partitions, there is
no guarantee of message time-ordering across the entire topic, just within a single
partition
Quote from the book Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale
.
What you can do about this and how to receive messages in order?
Option 1:
kafkaTemplate.send(TOPIC,"1", "TEST1".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST2".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST3".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST4".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST5".getBytes()).get();
This way, for every value, you send the same key "1". Kafka will choose partition based on your key. Since all keys are equal, all messages will go to the same partition and you will receive your records in order.
Option 2: Initialize KafkaEmbedded this way:
new KafkaEmbedded(1, true,1, TOPIC);
This way you are telling kafka that for this topic you would like to have only one partition so every record will go to that partition.