acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [localhost:9092, localhost:9094, localhost:9096]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-4
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
enable.metrics.push = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.adaptive.partitioning.enable = true
partitioner.availability.timeout.ms = 0
partitioner.class = null
partitioner.ignore.keys = false
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 3
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2024-10-18T20:00:49.577+03:00 INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
2024-10-18T20:00:49.577+03:00 INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-4] Instantiated an idempotent producer.
2024-10-18T20:00:49.580+03:00 INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.7.0
2024-10-18T20:00:49.580+03:00 INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 2ae524ed625438c5
2024-10-18T20:00:49.580+03:00 INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1729270849580
2024-10-18T20:00:49.583+03:00 INFO 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-4] Node -2 disconnected.
2024-10-18T20:00:49.583+03:00 INFO 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-4] Cancelled in-flight API_VERSIONS request with correlation id 0 due to node -2 being disconnected (elapsed time since creation: 1ms, elapsed time since send: 1ms, request timeout: 30000ms)
2024-10-18T20:00:49.583+03:00 WARN 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-4] Bootstrap broker localhost:9094 (id: -2 rack: null) disconnected
2024-10-18T20:00:49.587+03:00 INFO 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-4] Node -3 disconnected.
2024-10-18T20:00:49.587+03:00 INFO 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-4] Cancelled in-flight API_VERSIONS request with correlation id 3 due to node -3 being disconnected (elapsed time since creation: 1ms, elapsed time since send: 1ms, request timeout: 30000ms)
2024-10-18T20:00:49.587+03:00 WARN 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-4] Bootstrap broker localhost:9096 (id: -3 rack: null) disconnected
2024-10-18T20:00:49.587+03:00 INFO 5480 --- [card-composer-service] [ad | producer-4] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-4] ProducerId set to 1003 with epoch 0
2024-10-18T20:00:49.701+03:00 INFO 5480 --- [card-composer-service] [ad | producer-4] org.apache.kafka.clients.Metadata : [Producer clientId=producer-4] Cluster ID: nQCoigKGSP22Wno_34Ez0Q
2024-10-18T20:00:49.709+03:00 INFO 5480 --- [card-composer-service] [o-auto-1-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
I get this error when I'm trying to execute request :
{
"listOfWords": "side;main;plot;joke;joker;circus;",
"onlyTranslate": false
}
My ProducerConfig:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
public class KafkaProducerRequest {
public void sendMessage(String message) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9094,localhost:9096");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, "3");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("image-handler-topic", message);
producer.send(producerRecord);
producer.flush();
producer.close();
}
}
DockerFile:
version: "3.9"
networks:
main-network:
driver: bridge
volumes:
volume-kafka-1:
volume-kafka-2:
volume-kafka-3:
services:
kafka1:
image: 'bitnami/kafka:latest'
ports:
- "9092:9092"
networks:
- main-network
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=0
- KAFKA_BROKER_ID=0
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka1:9093,1@kafka2:9095,2@kafka3:9097
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_KRAFT_CLUSTER_ID=nQCoigKGSP22Wno_34Ez0Q
volumes:
- volume-kafka-1:/bitnami/kafka
kafka2:
image: 'bitnami/kafka:latest'
ports:
- "9094:9092"
networks:
- main-network
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=1
- KAFKA_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9094,CONTROLLER://0.0.0.0:9095
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka1:9093,1@kafka2:9095,2@kafka3:9097
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_KRAFT_CLUSTER_ID=nQCoigKGSP22Wno_34Ez0Q
volumes:
- volume-kafka-2:/bitnami/kafka
kafka3:
image: 'bitnami/kafka:latest'
ports:
- "9096:9092"
networks:
- main-network
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=2
- KAFKA_BROKER_ID=2
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9096,CONTROLLER://0.0.0.0:9097
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9096
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka1:9093,1@kafka2:9095,2@kafka3:9097
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_KRAFT_CLUSTER_ID=nQCoigKGSP22Wno_34Ez0Q
volumes:
- volume-kafka-3:/bitnami/kafka
ui:
image: provectuslabs/kafka-ui:v0.7.1
ports:
- "9898:8080"
environment:
- KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9094,kafka3:9096
- KAFKA_CLUSTERS_0_NAME=kraft
networks:
- main-network
I think it's something wrong with my Kafka Configuration but I can't realize the problem because I'm new to Apache Kafka Kraft D: so I wish you could help me.
My consumer also shows this log (I have added a third node) :
2024-10-18T20:04:41.029+03:00 INFO 16880 --- [image-handler-service] [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-instance-1-1, groupId=instance-1] Discovered group coordinator kafka3:9096 (id: 2147483645 rack: null)
2024-10-18T20:04:41.029+03:00 INFO 16880 --- [image-handler-service] [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-instance-1-1, groupId=instance-1] Group coordinator kafka3:9096 (id: 2147483645 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted.
2024-10-18T20:04:41.029+03:00 INFO 16880 --- [image-handler-service] [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-instance-1-1, groupId=instance-1] Requesting disconnect from last known coordinator kafka3:9096 (id: 2147483645 rack: null)
minikube + strimzi were really helpful althought a little harder to configure. I fixed this issue a longer time ago but have answered only now :D