I am working on Spring Boot Kafka
example from Confluent and running the simple producer example and getting below error. I am using Windows machine and installed ubunt 14.04 LTS on windows.
Note - Even though if I used localhost, still it doesn't work through code.
[2m2021-05-30 21:14:23.916[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.s.i.endpoint.EventDrivenConsumer [0;39m [2m:[0;39m started bean '_org.springframework.integration.errorLogger'
[2m2021-05-30 21:14:23.928[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mc.e.demo.HelloWorldKafkaApplication [0;39m [2m:[0;39m Started HelloWorldKafkaApplication in 2.619 seconds (JVM running for 3.694)
[2m2021-05-30 21:14:23.931[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mcom.example.demo.KafkaProducerService [0;39m [2m:[0;39m Producing Message- Key: 1, Value: {"name": "John", "age": 48}
[2m2021-05-30 21:14:23.970[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.a.k.clients.producer.ProducerConfig [0;39m [2m:[0;39m ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [127.0.0.1:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[2m2021-05-30 21:14:24.068[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.a.kafka.common.utils.AppInfoParser [0;39m [2m:[0;39m Kafka version: 2.6.0
[2m2021-05-30 21:14:24.071[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.a.kafka.common.utils.AppInfoParser [0;39m [2m:[0;39m Kafka commitId: 62abe01bee039651
[2m2021-05-30 21:14:24.071[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[ main][0;39m [36mo.a.kafka.common.utils.AppInfoParser [0;39m [2m:[0;39m Kafka startTimeMs: 1622389464066
[2m2021-05-30 21:14:26.079[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
[2m2021-05-30 21:14:26.079[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected
[2m2021-05-30 21:14:28.182[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
[2m2021-05-30 21:14:28.182[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected
and control center is also accessible -
KafkaProducerService.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class KafkaProducerService {
@Value("${topic.name}")
private String TOPIC;
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;
public void sendMessage(Integer key, String value) {
log.info(String.format("Producing Message- Key: %d, Value: %s", key, value));
kafkaTemplate.send(TOPIC, key, value);
}
}
HelloWorldKafkaApplication.java
@SpringBootApplication
public class HelloWorldKafkaApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(HelloWorldKafkaApplication.class, args);
}
@Autowired
private KafkaProducerService producerService;
@Override
public void run(String... args) throws Exception {
producerService.sendMessage(1, "{\"name\": \"John\", \"age\": 48}" );
producerService.sendMessage(1, "{\"name\": \"Harshita\", \"age\": 29}" );
producerService.sendMessage(1, "{\"name\": \"Laxmi\", \"age\": 63}" );
}
}
application.yml
spring:
kafka:
producer:
bootstrap-servers: 127.0.0.1:9092
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic:
name: users
As per guidance from @OneCricketeer and @manishKumarPandey - we should be using the below command to map the localhost to WSL2 IP value which is always changing on machine restart.
C:\WINDOWS\system32>netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=<IP OF YOUR WSL2>