I have this image of kafka running in a container:
service-kafka:
container_name: pepito-kafka-server
hostname: pepito
image: apache/kafka:3.7.1
networks:
- pepito-network
ports:
- 8484:9092
extra_hosts:
- "docker.internal:127.0.0.1"
healthcheck:
test: /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server pepito:9092 || exit 1
interval: 1s
timeout: 60s
retries: 60
as you can see I have a custom port for kafka,
now from my springboot app I'm trying to use it like this:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
}
and,
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
and,
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${pepito.group-id}")
private String groupId;
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = "${spring.application.name}", groupId = "${pepito.group-id}")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group " + groupId + ": " + message);
}
}
I have a producer and a consumer, because the idea for this app is to send as well as receive and process messages,
now, I'm getting these errors:
[36morg.apache.kafka.clients.Metadata [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Cluster ID: 5L6g3nShT-eMCtK--X85sw
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Discovered group coordinator localhost:9092 (id: 2147483546 rack: null)
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] (Re-)joining group
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 2147453646 disconnected.
[2m2025-04-21T15:43:36.068-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 2147483636 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid due to cause: null. isDisconnected: true. Rediscovery will be attempted.
[36mo.a.k.c.c.internals.ConsumerCoordinator [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Request joining group due to: rebalance failed due to 'null' (DisconnectException)
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.087-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.199-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.302-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.548-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:36.998-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Node 1 disconnected.
[2m2025-04-21T15:43:37.800-06:00[0;39m [33m WARN[0;39m [35m4088[0;39m [2m--- [pepito-app] [ntainer#0-0-C-1] [0;39m[36morg.apache.kafka.clients.NetworkClient [0;39m [2m:[0;39m [Consumer clientId=consumer-pepito.app-1, groupId=pepito.app] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
It's trying to connect to 127.0.0.1:9092 the default host/port for springboot,
Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
and I have in my application.properties
spring:
kafka:
bootstrap-servers: "pepito:8484" # in my hosts pepito is pointing to where docker is running
where else in my configuration do I have to indicate to connect ONLY to pepito:8484, and no other ip/port? also, I did not change a thing in my container image of kafka, do I need to set something there? any help is welcomed,
So basically, your Spring Boot app is like, "Hey Kafka, I'm knocking on pepito:8484, let’s chat!"
Kafka opens the door and goes, "Cool, nice to meet you! But if you wanna keep talking, hit me up at localhost:9092 from now on."
And your poor app is like, "Uh... okay, I guess..."
Then it tries to call localhost:9092, but obviously, there's no one home....connection errors.
In your service-kafka
container, add the env variable ADVERTISED_LISTENERS
as:
service-kafka:
container_name: pepito-kafka-server
hostname: pepito
image: apache/kafka:3.7.1
networks:
- pepito-network
ports:
- 8484:9092
environment:
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://pepito:8484 #here
extra_hosts:
- "docker.internal:127.0.0.1"
healthcheck:
test: /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server pepito:9092 || exit 1
interval: 1s
timeout: 60s
retries: 60
This time, kafka will tell the app "its okay, now we can talk".