apache-kafkaspring-kafka

Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available


I am working on Spring Boot Kafka example from Confluent and running the simple producer example and getting below error. I am using Windows machine and installed ubunt 14.04 LTS on windows.

Note - Even though if I used localhost, still it doesn't work through code.

[2m2021-05-30 21:14:23.916[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.i.endpoint.EventDrivenConsumer      [0;39m [2m:[0;39m started bean '_org.springframework.integration.errorLogger'
[2m2021-05-30 21:14:23.928[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mc.e.demo.HelloWorldKafkaApplication     [0;39m [2m:[0;39m Started HelloWorldKafkaApplication in 2.619 seconds (JVM running for 3.694)
[2m2021-05-30 21:14:23.931[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.example.demo.KafkaProducerService   [0;39m [2m:[0;39m Producing Message- Key: 1, Value: {"name": "John", "age": 48}
[2m2021-05-30 21:14:23.970[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.k.clients.producer.ProducerConfig   [0;39m [2m:[0;39m ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [127.0.0.1:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    internal.auto.downgrade.txn.commit = true
    key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[2m2021-05-30 21:14:24.068[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.kafka.common.utils.AppInfoParser    [0;39m [2m:[0;39m Kafka version: 2.6.0
[2m2021-05-30 21:14:24.071[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.kafka.common.utils.AppInfoParser    [0;39m [2m:[0;39m Kafka commitId: 62abe01bee039651
[2m2021-05-30 21:14:24.071[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.kafka.common.utils.AppInfoParser    [0;39m [2m:[0;39m Kafka startTimeMs: 1622389464066
[2m2021-05-30 21:14:26.079[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
[2m2021-05-30 21:14:26.079[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected
[2m2021-05-30 21:14:28.182[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
[2m2021-05-30 21:14:28.182[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected

enter image description here

and control center is also accessible -

enter image description here

KafkaProducerService.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class KafkaProducerService {

    @Value("${topic.name}")
    private String TOPIC;

    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate;

    public void sendMessage(Integer key, String value) {
        log.info(String.format("Producing Message- Key: %d, Value: %s", key, value));
        kafkaTemplate.send(TOPIC, key, value);
    }
}

HelloWorldKafkaApplication.java

@SpringBootApplication
public class HelloWorldKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(HelloWorldKafkaApplication.class, args);
    }
    
    @Autowired
    private KafkaProducerService producerService;

    @Override
    public void run(String... args) throws Exception {
        producerService.sendMessage(1, "{\"name\": \"John\", \"age\": 48}" );
        producerService.sendMessage(1, "{\"name\": \"Harshita\", \"age\": 29}" );
        producerService.sendMessage(1, "{\"name\": \"Laxmi\", \"age\": 63}" );
    }

}

application.yml

spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic:
  name: users

server.properties enter image description here


Solution

  • As per guidance from @OneCricketeer and @manishKumarPandey - we should be using the below command to map the localhost to WSL2 IP value which is always changing on machine restart.

    C:\WINDOWS\system32>netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=<IP OF YOUR WSL2>