javaapache-kafkagatling

Kafka load test on Gatling


Gatling as maven dependencies:

<plugin>
<groupId>io.gatling</groupId>
<artifactId>gatling-maven-plugin</artifactId>
<version>4.19.1</version>
</plugin>

<dependencies>
    <dependency>
      <groupId>io.gatling.highcharts</groupId>
      <artifactId>gatling-charts-highcharts</artifactId>
      <version>3.14.3</version>
      <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>4.0.0</version>
    </dependency>
</dependencies>

I have created load test:

import io.gatling.javaapi.core.*;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static io.gatling.javaapi.core.CoreDsl.*;

public class KafkaLoadTesting extends Simulation {
    private static final String TOPIC = "likes-bucket";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";


    private static void sendKafkaMessage(String message) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
        producer.send(record);
        producer.close();
    }

    ChainBuilder kafkaAction = exec(session -> {
        String payload = "{\"talkName\":\"Spring best practice\",\"likes\":1}";
        sendKafkaMessage(payload);
        return session;
    });

    {
        setUp(
                scenario("Kafka Load Test")
                        .exec(kafkaAction)
                        .pause(1, TimeUnit.SECONDS.ordinal())
                        .repeat(100).on(kafkaAction)
                        .injectOpen(atOnceUsers(10))
        ).protocols();
    }
}

I run the test ./mvnw clean gatling:test My java version: java 23.0.2 2025-01-21 I dont see any messages in kafka but i got loop. What i do wrong and what is correct way to send message to broker? i dont see any error on console.

enter image description here


Solution

  • I recommend using a ready-made plugin to load Kafka — creating everything from scratch can be quite tough.
    I'm the author of this plugin: https://github.com/Amerousful/gatling-kafka — feel free to check it out.
    It’s supported up to Gatling 3.9.5. The plugin provides various ways to load Kafka: you can just send a message, send and wait for a reply, or even only wait for replies.\

    For instance, this is a common way just to send a message:

    .exec(
        kafka("Kafka: fire and forget")
            .send()
            .topic("input_topic")
            .payload("#{payload}")
            .key("#{key}")
            .header("k1", "v1")
            .headers(Collections.singletonMap("key", "value"))
         )
    

    Protocol part:

    KafkaProtocolBuilder kafkaProtocol = kafka
                .broker(KafkaBroker("kafka.us-east-1.amazonaws.com", 9096))
                .brokers(
                        KafkaBroker("kafka.us-east-2.amazonaws.com", 9096),
                        KafkaBroker("kafka.us-east-3.amazonaws.com", 9096)
                )
                .acks("1")
                .producerIdenticalSerializer("org.apache.kafka.common.serialization.StringSerializer")
                .consumerIdenticalDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
                .addProducerProperty("retries", "3")
                .addConsumerProperty("heartbeat.interval.ms", "3000")
                .credentials("admin", "password", true, SaslMechanism.plain())
                .replyTimeout(10)
                .matchByKey()
                .matchByValue()
                .messageMatcher(customMatcher)
                .replyConsumerName("gatling-test-consumer");
    

    There’s also another plugin: https://github.com/galax-io/gatling-kafka-plugin The main advantage is that it supports newer Gatling versions, so it might be a better fit for you since you're using version 3.14.3.

    One more point — supported serializations: this plugin supports Avro, while mine supports Protobuf.