javadockerapache-kafkadocker-composekraft

Apache Kafka Kraft - Cancelled in-flight API_VERSIONS request


acks = -1
    auto.include.jmx.reporter = true
    batch.size = 16384
    bootstrap.servers = [localhost:9092, localhost:9094, localhost:9096]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-4
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = true
    enable.metrics.push = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.adaptive.partitioning.enable = true
    partitioner.availability.timeout.ms = 0
    partitioner.class = null
    partitioner.ignore.keys = false
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 3
    retry.backoff.max.ms = 1000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2024-10-18T20:00:49.577+03:00  INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
2024-10-18T20:00:49.577+03:00  INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-4] Instantiated an idempotent producer.
2024-10-18T20:00:49.580+03:00  INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.7.0
2024-10-18T20:00:49.580+03:00  INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 2ae524ed625438c5
2024-10-18T20:00:49.580+03:00  INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1729270849580
2024-10-18T20:00:49.583+03:00  INFO 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-4] Node -2 disconnected.
2024-10-18T20:00:49.583+03:00  INFO 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-4] Cancelled in-flight API_VERSIONS request with correlation id 0 due to node -2 being disconnected (elapsed time since creation: 1ms, elapsed time since send: 1ms, request timeout: 30000ms)
2024-10-18T20:00:49.583+03:00  WARN 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-4] Bootstrap broker localhost:9094 (id: -2 rack: null) disconnected
2024-10-18T20:00:49.587+03:00  INFO 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-4] Node -3 disconnected.
2024-10-18T20:00:49.587+03:00  INFO 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-4] Cancelled in-flight API_VERSIONS request with correlation id 3 due to node -3 being disconnected (elapsed time since creation: 1ms, elapsed time since send: 1ms, request timeout: 30000ms)
2024-10-18T20:00:49.587+03:00  WARN 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-4] Bootstrap broker localhost:9096 (id: -3 rack: null) disconnected
2024-10-18T20:00:49.587+03:00  INFO 5480 --- [card-composer-service] [ad | producer-4] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-4] ProducerId set to 1003 with epoch 0
2024-10-18T20:00:49.701+03:00  INFO 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-4] Cluster ID: nQCoigKGSP22Wno_34Ez0Q
2024-10-18T20:00:49.709+03:00  INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.


I get this error when I'm trying to execute request :

{
    "listOfWords": "side;main;plot;joke;joker;circus;",
    "onlyTranslate": false
}

My ProducerConfig:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Component;

import java.util.Properties;

@Component
public class KafkaProducerRequest {
    public void sendMessage(String message) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9094,localhost:9096");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.RETRIES_CONFIG, "3");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("image-handler-topic", message);

        producer.send(producerRecord);
        producer.flush();
        producer.close();
    }
}

DockerFile:

version: "3.9"
networks:
  main-network:
    driver: bridge
volumes:
  volume-kafka-1:
  volume-kafka-2:
  volume-kafka-3:

services:
  kafka1:
    image: 'bitnami/kafka:latest'
    ports:
      - "9092:9092"
    networks:
      - main-network
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_BROKER_ID=0
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka1:9093,1@kafka2:9095,2@kafka3:9097
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_KRAFT_CLUSTER_ID=nQCoigKGSP22Wno_34Ez0Q
    volumes:
      - volume-kafka-1:/bitnami/kafka
  kafka2:
    image: 'bitnami/kafka:latest'
    ports:
      - "9094:9092"
    networks:
      - main-network
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9094,CONTROLLER://0.0.0.0:9095
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka1:9093,1@kafka2:9095,2@kafka3:9097
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_KRAFT_CLUSTER_ID=nQCoigKGSP22Wno_34Ez0Q
    volumes:
      - volume-kafka-2:/bitnami/kafka
  kafka3:
    image: 'bitnami/kafka:latest'
    ports:
      - "9096:9092"
    networks:
      - main-network
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_BROKER_ID=2
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9096,CONTROLLER://0.0.0.0:9097
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9096
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka1:9093,1@kafka2:9095,2@kafka3:9097
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_KRAFT_CLUSTER_ID=nQCoigKGSP22Wno_34Ez0Q
    volumes:
      - volume-kafka-3:/bitnami/kafka
  ui:
    image: provectuslabs/kafka-ui:v0.7.1
    ports:
      - "9898:8080"
    environment:
      - KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9094,kafka3:9096
      - KAFKA_CLUSTERS_0_NAME=kraft
    networks:
      - main-network

I think it's something wrong with my Kafka Configuration but I can't realize the problem because I'm new to Apache Kafka Kraft D: so I wish you could help me.

My consumer also shows this log (I have added a third node) :

2024-10-18T20:04:41.029+03:00  INFO 16880 --- [image-handler-service] [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-instance-1-1, groupId=instance-1] Discovered group coordinator kafka3:9096 (id: 2147483645 rack: null)
2024-10-18T20:04:41.029+03:00  INFO 16880 --- [image-handler-service] [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-instance-1-1, groupId=instance-1] Group coordinator kafka3:9096 (id: 2147483645 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted.
2024-10-18T20:04:41.029+03:00  INFO 16880 --- [image-handler-service] [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-instance-1-1, groupId=instance-1] Requesting disconnect from last known coordinator kafka3:9096 (id: 2147483645 rack: null)

Solution

  • minikube + strimzi were really helpful althought a little harder to configure. I fixed this issue a longer time ago but have answered only now :D