unit-testingapache-kafkaembedded-kafka

Is message order not guaranteed using KafkaEmbedded?


I made a unit test using KafkaEmbedded (and KafkaTemplate), but the message order is random. Does anyone know if it is logical, and if it is possible guaranty order?

here is my code:

public class KafkaTest {

  private static String TOPIC = "test.topic";

  @ClassRule
  public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TOPIC);

  @Test
  public void testEmbeddedKafkaSendOrder() throws Exception {
    Map<String, Object> producerConfig = new HashMap<>();
    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

    KafkaTemplate<String, byte[]> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig));
    kafkaTemplate.send(TOPIC, "TEST1".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST2".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST3".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST4".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST5".getBytes()).get();

    Map<String, Object> consumerConfig = new HashMap<>();
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test-group");
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    consumerConfig.put("auto.offset.reset", "earliest");

    final Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerConfig);
    embeddedKafka.consumeFromAnEmbeddedTopic(consumer, TOPIC);
    ConsumerRecords<String, byte[]> records = consumer.poll(100L);

    // Tests
    final Iterator<ConsumerRecord<String, byte[]>> recordIterator = records.iterator();
    while (recordIterator.hasNext()) {
      System.out.println("received:" + new String(recordIterator.next().value()));
    }
  }

This code prints for example (but the order can change):

received:TEST2
received:TEST4
received:TEST1
received:TEST3
received:TEST5

Solution

  • In Kafka, you can be sure that order of messages is the same on the same partition, but not on the topic.

    Note that as a topic typically has multiple partitions, there is
    no guarantee of message time-ordering across the entire topic, just within a single
    partition
    

    Quote from the book Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale . What you can do about this and how to receive messages in order? Option 1:

            kafkaTemplate.send(TOPIC,"1", "TEST1".getBytes()).get();
            kafkaTemplate.send(TOPIC,"1", "TEST2".getBytes()).get();
            kafkaTemplate.send(TOPIC,"1", "TEST3".getBytes()).get();
            kafkaTemplate.send(TOPIC,"1", "TEST4".getBytes()).get();
            kafkaTemplate.send(TOPIC,"1", "TEST5".getBytes()).get();
    

    This way, for every value, you send the same key "1". Kafka will choose partition based on your key. Since all keys are equal, all messages will go to the same partition and you will receive your records in order.

    Option 2: Initialize KafkaEmbedded this way:

    new KafkaEmbedded(1, true,1, TOPIC);
    

    This way you are telling kafka that for this topic you would like to have only one partition so every record will go to that partition.